You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/02/16 15:28:44 UTC
[cassandra] branch trunk updated: Improve coordination tests
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6665fc2 Improve coordination tests
6665fc2 is described below
commit 6665fc29b33abcc26aad4cecbfee88225b0a7225
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Sat Oct 31 11:13:56 2020 +0000
Improve coordination tests
patch by Andrés de la Peña; reviewed by Caleb Rackliffe, Alex Petrov and Blake Eggleston for CASSANDRA-16180
---
src/java/org/apache/cassandra/dht/Token.java | 9 -
.../org/apache/cassandra/service/StorageProxy.java | 470 +---------------
.../service/reads/AbstractReadExecutor.java | 17 +-
.../service/reads/range/RangeCommandIterator.java | 291 ++++++++++
.../service/reads/range/RangeCommands.java | 117 ++++
.../service/reads/range/ReplicaPlanIterator.java | 123 +++++
.../service/reads/range/ReplicaPlanMerger.java | 76 +++
.../service/reads/range/SingleRangeResponse.java | 75 +++
.../distributed/impl/AbstractCluster.java | 6 +-
.../distributed/test/LargeMessageTest.java | 47 ++
.../distributed/test/MetricsCountQueriesTest.java | 57 ++
.../cassandra/distributed/test/PagingTest.java | 81 +++
.../test/ReadRepairEmptyRangeTombstonesTest.java | 1 -
.../distributed/test/ReadRepairQueryTypesTest.java | 6 -
.../distributed/test/ReadRepairTester.java | 3 -
.../distributed/test/SSTableSkippingReadTest.java | 127 +++++
.../distributed/test/SchemaDisagreementTest.java | 144 +++++
.../distributed/test/ShortReadProtectionTest.java | 553 ++++++++++++++++---
.../distributed/test/SimpleReadWriteTest.java | 592 ++++++++-------------
.../cassandra/distributed/test/TestBaseImpl.java | 10 +
.../upgrade/MixedModeAvailabilityTestBase.java | 170 ++++++
.../upgrade/MixedModeAvailabilityV22Test.java | 41 ++
.../upgrade/MixedModeAvailabilityV30Test.java | 41 ++
.../upgrade/MixedModeAvailabilityV3XTest.java | 35 ++
.../upgrade/MixedModeConsistencyTestBase.java | 123 +++++
.../upgrade/MixedModeConsistencyV22Test.java | 41 ++
.../upgrade/MixedModeConsistencyV30Test.java | 41 ++
.../upgrade/MixedModeConsistencyV3XTest.java | 35 ++
.../cassandra/distributed/upgrade/UpgradeTest.java | 27 -
test/unit/org/apache/cassandra/Util.java | 5 +
.../cassandra/db/PartitionRangeReadTest.java | 196 +------
.../apache/cassandra/service/StorageProxyTest.java | 239 ---------
.../reads/range/RangeCommandIteratorTest.java | 180 +++++++
.../service/reads/range/RangeCommandsTest.java | 281 ++++++++++
.../reads/range/ReplicaPlanIteratorTest.java | 211 ++++++++
.../service/reads/range/ReplicaPlanMergerTest.java | 412 ++++++++++++++
.../service/reads/range/TokenUpdater.java | 133 +++++
37 files changed, 3666 insertions(+), 1350 deletions(-)
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index ccb66fd..d8e82f8 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -159,15 +159,6 @@ public abstract class Token implements RingPosition<Token>, Serializable
return new KeyBound(this, false);
}
- @SuppressWarnings("unchecked")
- public <R extends RingPosition<R>> R upperBound(Class<R> klass)
- {
- if (klass.equals(getClass()))
- return (R)this;
- else
- return (R)maxKeyBound();
- }
-
public static class KeyBound implements PartitionPosition
{
private final Token token;
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index df3a6f5..b7b9f2c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,12 +40,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
@@ -58,14 +53,12 @@ import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
@@ -73,7 +66,6 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.TruncateRequest;
import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -82,9 +74,6 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.ViewUtils;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
@@ -102,19 +91,15 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.index.Index;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.CASClientRequestMetrics;
import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
import org.apache.cassandra.metrics.ClientRequestMetrics;
@@ -136,12 +121,11 @@ import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.PrepareCallback;
import org.apache.cassandra.service.paxos.ProposeCallback;
import org.apache.cassandra.service.reads.AbstractReadExecutor;
-import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.range.RangeCommands;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MonotonicClock;
@@ -183,7 +167,6 @@ public class StorageProxy implements StorageProxyMBean
}
};
private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
- private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write");
private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite");
private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
@@ -191,17 +174,6 @@ public class StorageProxy implements StorageProxyMBean
private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class);
private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
- private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
-
- /**
- * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
- * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
- * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
- * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
- * we want to give range queries a chance to run if resources are available.
- */
- private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
-
private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability";
private static final boolean disableSerialReadLinearizability =
Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false"));
@@ -1105,7 +1077,7 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Determining replicas for atomic batch");
long startTime = System.nanoTime();
- List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size());
+ List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
throw new AssertionError("Logged batches are unsupported with transient replication");
@@ -1887,7 +1859,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair> repairs)
+ public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs)
{
PartitionIterator concatenated = PartitionIterators.concat(iterators);
@@ -1977,7 +1949,7 @@ public class StorageProxy implements StorageProxyMBean
// if we didn't do a read repair, return the contents of the data response, if we did do a read
// repair, merge the full data reads
List<PartitionIterator> results = new ArrayList<>(cmdCount);
- List<ReadRepair> repairs = new ArrayList<>(cmdCount);
+ List<ReadRepair<?, ?>> repairs = new ArrayList<>(cmdCount);
for (int i=0; i<cmdCount; i++)
{
results.add(reads[i].getResult());
@@ -2041,390 +2013,11 @@ public class StorageProxy implements StorageProxyMBean
}
}
- /**
- * Estimate the number of result rows per range in the ring based on our local data.
- * <p>
- * This assumes that ranges are uniformly distributed across the cluster and
- * that the queried data is also uniformly distributed.
- */
- private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
+ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ long queryStartNanoTime)
{
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
- Index index = command.getIndex(cfs);
- float maxExpectedResults = index == null
- ? command.limits().estimateTotalResults(cfs)
- : index.getEstimatedResultRows();
-
- // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
- return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
- }
-
- @VisibleForTesting
- public static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
- {
- private final Keyspace keyspace;
- private final ConsistencyLevel consistency;
- private final Iterator<? extends AbstractBounds<PartitionPosition>> ranges;
- private final int rangeCount;
-
- public RangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistency)
- {
- this.keyspace = keyspace;
- this.consistency = consistency;
-
- List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy
- ? command.dataRange().keyRange().unwrap()
- : getRestrictedRanges(command.dataRange().keyRange());
- this.ranges = l.iterator();
- this.rangeCount = l.size();
- }
-
- public int rangeCount()
- {
- return rangeCount;
- }
-
- protected ReplicaPlan.ForRangeRead computeNext()
- {
- if (!ranges.hasNext())
- return endOfData();
-
- return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next(), 1);
- }
- }
-
- public static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
- {
- private final Keyspace keyspace;
- private final ConsistencyLevel consistency;
- private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
-
- public RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
- {
- this.keyspace = keyspace;
- this.consistency = consistency;
- this.ranges = Iterators.peekingIterator(iterator);
- }
-
- protected ReplicaPlan.ForRangeRead computeNext()
- {
- if (!ranges.hasNext())
- return endOfData();
-
- ReplicaPlan.ForRangeRead current = ranges.next();
-
- // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
- // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
- // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
- while (ranges.hasNext())
- {
- // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
- // don't know how to deal with a wrapping range.
- // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
- // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
- // wire compatibility, so It's likely easier not to bother;
- if (current.range().right.isMinimum())
- break;
-
- ReplicaPlan.ForRangeRead next = ranges.peek();
- ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
- if (merged == null)
- break;
-
- current = merged;
- ranges.next(); // consume the range we just merged since we've only peeked so far
- }
- return current;
- }
- }
-
- private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
- {
- private final DataResolver resolver;
- private final ReadCallback handler;
- private final ReadRepair readRepair;
- private PartitionIterator result;
-
- private SingleRangeResponse(DataResolver resolver, ReadCallback handler, ReadRepair readRepair)
- {
- this.resolver = resolver;
- this.handler = handler;
- this.readRepair = readRepair;
- }
-
- private void waitForResponse() throws ReadTimeoutException
- {
- if (result != null)
- return;
-
- handler.awaitResults();
- result = resolver.resolve();
- }
-
- protected RowIterator computeNext()
- {
- waitForResponse();
- return result.hasNext() ? result.next() : endOfData();
- }
-
- public void close()
- {
- if (result != null)
- result.close();
- }
- }
-
- public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
- {
- private final Iterator<ReplicaPlan.ForRangeRead> ranges;
- private final int totalRangeCount;
- private final PartitionRangeReadCommand command;
- private final boolean enforceStrictLiveness;
-
- private final long startTime;
- private final long queryStartNanoTime;
- private DataLimits.Counter counter;
- private PartitionIterator sentQueryIterator;
-
- private final int maxConcurrencyFactor;
- private int concurrencyFactor;
- // The two following "metric" are maintained to improve the concurrencyFactor
- // when it was not good enough initially.
- private int liveReturned;
- private int rangesQueried;
- private int batchesRequested = 0;
-
- public RangeCommandIterator(Iterator<ReplicaPlan.ForRangeRead> ranges,
- PartitionRangeReadCommand command,
- int concurrencyFactor,
- int maxConcurrencyFactor,
- int totalRangeCount,
- long queryStartNanoTime)
- {
- this.command = command;
- this.concurrencyFactor = concurrencyFactor;
- this.maxConcurrencyFactor = maxConcurrencyFactor;
- this.startTime = System.nanoTime();
- this.ranges = ranges;
- this.totalRangeCount = totalRangeCount;
- this.queryStartNanoTime = queryStartNanoTime;
- this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
- }
-
- public RowIterator computeNext()
- {
- try
- {
- while (sentQueryIterator == null || !sentQueryIterator.hasNext())
- {
- // If we don't have more range to handle, we're done
- if (!ranges.hasNext())
- return endOfData();
-
- // else, sends the next batch of concurrent queries (after having close the previous iterator)
- if (sentQueryIterator != null)
- {
- liveReturned += counter.counted();
- sentQueryIterator.close();
-
- // It's not the first batch of queries and we're not done, so we we can use what has been
- // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
- updateConcurrencyFactor();
- }
- sentQueryIterator = sendNextRequests();
- }
-
- return sentQueryIterator.next();
- }
- catch (UnavailableException e)
- {
- rangeMetrics.unavailables.mark();
- throw e;
- }
- catch (ReadTimeoutException e)
- {
- rangeMetrics.timeouts.mark();
- throw e;
- }
- catch (ReadFailureException e)
- {
- rangeMetrics.failures.mark();
- throw e;
- }
- }
-
- private void updateConcurrencyFactor()
- {
- liveReturned += counter.counted();
-
- concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
- }
-
- @VisibleForTesting
- public static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
- {
- maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
- if (liveReturned == 0)
- {
- // we haven't actually gotten any results, so query up to the limit if not results so far
- Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
- return maxConcurrencyFactor;
- }
-
- // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
- // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
- int remainingRows = limit - liveReturned;
- float rowsPerRange = (float)liveReturned / (float)rangesQueried;
- int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
- logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
- rowsPerRange, remainingRows, concurrencyFactor);
- return concurrencyFactor;
- }
-
- /**
- * Queries the provided sub-range.
- *
- * @param replicaPlan the subRange to query.
- * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
- * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
- * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
- * that it's the query that "continues" whatever we're previously queried).
- */
- private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
- {
- PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
- // If enabled, request repaired data tracking info from full replicas but
- // only if there are multiple full replicas to compare results from
- if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
- && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
- {
- command.trackRepairedStatus();
- rangeCommand.trackRepairedStatus();
- }
-
- ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
- ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
- = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
- DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
- = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
- ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
- = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
-
-
- if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
- {
- Stage.READ.execute(new LocalReadRunnable(rangeCommand, handler));
- }
- else
- {
- for (Replica replica : replicaPlan.contacts())
- {
- Tracing.trace("Enqueuing request to {}", replica);
- ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
- Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
- MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
- }
- }
-
- return new SingleRangeResponse(resolver, handler, readRepair);
- }
-
- private PartitionIterator sendNextRequests()
- {
- List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
- List<ReadRepair> readRepairs = new ArrayList<>(concurrencyFactor);
-
- try
- {
- for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
- {
- ReplicaPlan.ForRangeRead range = ranges.next();
-
- @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
- SingleRangeResponse response = query(range, i == 0);
- concurrentQueries.add(response);
- readRepairs.add(response.readRepair);
- // due to RangeMerger, coordinator may fetch more ranges than required by concurrency factor.
- rangesQueried += range.vnodeCount();
- i += range.vnodeCount();
- }
- batchesRequested++;
- }
- catch (Throwable t)
- {
- for (PartitionIterator response: concurrentQueries)
- response.close();
- throw t;
- }
-
- Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
- // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
- // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
- counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
- return counter.applyTo(concatAndBlockOnRepair(concurrentQueries, readRepairs));
- }
-
- public void close()
- {
- try
- {
- if (sentQueryIterator != null)
- sentQueryIterator.close();
- }
- finally
- {
- long latency = System.nanoTime() - startTime;
- rangeMetrics.addNano(latency);
- Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
- }
- }
-
- @VisibleForTesting
- public int rangesQueried()
- {
- return rangesQueried;
- }
-
- @VisibleForTesting
- public int batchesRequested()
- {
- return batchesRequested;
- }
- }
-
- @SuppressWarnings("resource")
- public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
- {
- Tracing.trace("Computing ranges to query");
-
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
-
- // our estimate of how many result rows there will be per-range
- float resultsPerRange = estimateResultsPerRange(command, keyspace);
- // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
- // fetch enough rows in the first round
- resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
- int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS);
- int concurrencyFactor = resultsPerRange == 0.0
- ? 1
- : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
- logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
- resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
- Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
-
- // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
- RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel);
- RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges,
- command,
- concurrencyFactor,
- maxConcurrencyFactor,
- ranges.rangeCount(),
- queryStartNanoTime);
- return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator),
- command.nowInSec(),
- command.selectsFullPartition(),
- command.metadata().enforceStrictLiveness());
+ return RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime);
}
public Map<String, List<String>> getSchemaVersions()
@@ -2503,53 +2096,6 @@ public class StorageProxy implements StorageProxyMBean
return results;
}
- /**
- * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
- * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
- */
- static <T extends RingPosition<T>> List<AbstractBounds<T>> getRestrictedRanges(final AbstractBounds<T> queryRange)
- {
- // special case for bounds containing exactly 1 (non-minimum) token
- if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
- {
- return Collections.singletonList(queryRange);
- }
-
- TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
-
- List<AbstractBounds<T>> ranges = new ArrayList<AbstractBounds<T>>();
- // divide the queryRange into pieces delimited by the ring and minimum tokens
- Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
- AbstractBounds<T> remainder = queryRange;
- while (ringIter.hasNext())
- {
- /*
- * remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
- * - if remainder is tokens, then we'll just split using the provided token.
- * - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
- * is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
- * split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
- * tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
- * keys having 15 as token and B include none of those (since that is what our node owns).
- * asSplitValue() abstracts that choice.
- */
- Token upperBoundToken = ringIter.next();
- T upperBound = (T)upperBoundToken.upperBound(queryRange.left.getClass());
- if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
- // no more splits
- break;
- Pair<AbstractBounds<T>,AbstractBounds<T>> splits = remainder.split(upperBound);
- if (splits == null)
- continue;
-
- ranges.add(splits.left);
- remainder = splits.right;
- }
- ranges.add(remainder);
-
- return ranges;
- }
-
public boolean getHintedHandoffEnabled()
{
return DatabaseDescriptor.hintedHandoffEnabled();
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index ae09417..8a83d3e 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -18,12 +18,6 @@
package org.apache.cassandra.service.reads;
import com.google.common.base.Preconditions;
-
-import com.google.common.base.Predicates;
-
-import org.apache.cassandra.db.transform.DuplicateRowChecker;
-import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaPlans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +25,11 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
@@ -42,10 +37,12 @@ import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
@@ -106,7 +103,7 @@ public abstract class AbstractReadExecutor
return ((SinglePartitionReadCommand) command).partitionKey();
}
- public ReadRepair getReadRepair()
+ public ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> getReadRepair()
{
return readRepair;
}
@@ -282,7 +279,7 @@ public abstract class AbstractReadExecutor
Replica extraReplica;
if (handler.resolver.isDataPresent())
{
- extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
+ extraReplica = replicaPlan.firstUncontactedCandidate(replica -> true);
// we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
assert extraReplica != null;
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
new file mode 100644
index 0000000..ae7ee60
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+{
+ private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
+
+ private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
+
+ private final CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans;
+ private final int totalRangeCount;
+ private final PartitionRangeReadCommand command;
+ private final boolean enforceStrictLiveness;
+
+ private final long startTime;
+ private final long queryStartNanoTime;
+ private DataLimits.Counter counter;
+ private PartitionIterator sentQueryIterator;
+
+ private final int maxConcurrencyFactor;
+ private int concurrencyFactor;
+ // The two following "metric" are maintained to improve the concurrencyFactor
+ // when it was not good enough initially.
+ private int liveReturned;
+ private int rangesQueried;
+ private int batchesRequested = 0;
+
+ RangeCommandIterator(CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans,
+ PartitionRangeReadCommand command,
+ int concurrencyFactor,
+ int maxConcurrencyFactor,
+ int totalRangeCount,
+ long queryStartNanoTime)
+ {
+ this.replicaPlans = replicaPlans;
+ this.command = command;
+ this.concurrencyFactor = concurrencyFactor;
+ this.maxConcurrencyFactor = maxConcurrencyFactor;
+ this.totalRangeCount = totalRangeCount;
+ this.queryStartNanoTime = queryStartNanoTime;
+
+ startTime = System.nanoTime();
+ enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ }
+
+ @Override
+ protected RowIterator computeNext()
+ {
+ try
+ {
+ while (sentQueryIterator == null || !sentQueryIterator.hasNext())
+ {
+ // If we don't have more range to handle, we're done
+ if (!replicaPlans.hasNext())
+ return endOfData();
+
+ // else, sends the next batch of concurrent queries (after having close the previous iterator)
+ if (sentQueryIterator != null)
+ {
+ liveReturned += counter.counted();
+ sentQueryIterator.close();
+
+ // It's not the first batch of queries and we're not done, so we we can use what has been
+ // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
+ updateConcurrencyFactor();
+ }
+ sentQueryIterator = sendNextRequests();
+ }
+
+ return sentQueryIterator.next();
+ }
+ catch (UnavailableException e)
+ {
+ rangeMetrics.unavailables.mark();
+ throw e;
+ }
+ catch (ReadTimeoutException e)
+ {
+ rangeMetrics.timeouts.mark();
+ throw e;
+ }
+ catch (ReadFailureException e)
+ {
+ rangeMetrics.failures.mark();
+ throw e;
+ }
+ }
+
+ private void updateConcurrencyFactor()
+ {
+ liveReturned += counter.counted();
+
+ concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
+ }
+
+ @VisibleForTesting
+ static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
+ {
+ maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
+ if (liveReturned == 0)
+ {
+ // we haven't actually gotten any results, so query up to the limit if not results so far
+ Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
+ return maxConcurrencyFactor;
+ }
+
+ // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+ // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+ int remainingRows = limit - liveReturned;
+ float rowsPerRange = (float) liveReturned / (float) rangesQueried;
+ int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
+ logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+ rowsPerRange, remainingRows, concurrencyFactor);
+ return concurrencyFactor;
+ }
+
+ /**
+ * Queries the provided sub-range.
+ *
+ * @param replicaPlan the subRange to query.
+ * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
+ * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
+ * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
+ * that it's the query that "continues" whatever we're previously queried).
+ */
+ private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
+ {
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+ // If enabled, request repaired data tracking info from full replicas but
+ // only if there are multiple full replicas to compare results from
+ if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+ && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
+ {
+ command.trackRepairedStatus();
+ rangeCommand.trackRepairedStatus();
+ }
+
+ ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair =
+ ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+ DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver =
+ new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+ ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler =
+ new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
+
+ if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
+ {
+ Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler));
+ }
+ else
+ {
+ for (Replica replica : replicaPlan.contacts())
+ {
+ Tracing.trace("Enqueuing request to {}", replica);
+ ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
+ Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
+ MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
+ }
+ }
+
+ return new SingleRangeResponse(resolver, handler, readRepair);
+ }
+
+ private PartitionIterator sendNextRequests()
+ {
+ List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+ List<ReadRepair<?, ?>> readRepairs = new ArrayList<>(concurrencyFactor);
+
+ try
+ {
+ for (int i = 0; i < concurrencyFactor && replicaPlans.hasNext(); )
+ {
+ ReplicaPlan.ForRangeRead replicaPlan = replicaPlans.next();
+
+ @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
+ SingleRangeResponse response = query(replicaPlan, i == 0);
+ concurrentQueries.add(response);
+ readRepairs.add(response.getReadRepair());
+ // due to RangeMerger, coordinator may fetch more ranges than required by concurrency factor.
+ rangesQueried += replicaPlan.vnodeCount();
+ i += replicaPlan.vnodeCount();
+ }
+ batchesRequested++;
+ }
+ catch (Throwable t)
+ {
+ for (PartitionIterator response : concurrentQueries)
+ response.close();
+ throw t;
+ }
+
+ Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
+ // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor)
+ // but we don't want to enforce any particular limit at this point (this could break code than rely on
+ // postReconciliationProcessing), hence the DataLimits.NONE.
+ counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
+ return counter.applyTo(StorageProxy.concatAndBlockOnRepair(concurrentQueries, readRepairs));
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ if (sentQueryIterator != null)
+ sentQueryIterator.close();
+
+ replicaPlans.close();
+ }
+ finally
+ {
+ long latency = System.nanoTime() - startTime;
+ rangeMetrics.addNano(latency);
+ Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ int rangesQueried()
+ {
+ return rangesQueried;
+ }
+
+ @VisibleForTesting
+ int batchesRequested()
+ {
+ return batchesRequested;
+ }
+
+ @VisibleForTesting
+ int maxConcurrencyFactor()
+ {
+ return maxConcurrencyFactor;
+ }
+
+ @VisibleForTesting
+ int concurrencyFactor()
+ {
+ return concurrencyFactor;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
new file mode 100644
index 0000000..3452a35
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.util.VisibleForTesting;
+
+public class RangeCommands
+{
+ private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
+
+ private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
+
+ /**
+ * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
+ * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
+ * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
+ * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
+ * we want to give range queries a chance to run if resources are available.
+ */
+ private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests",
+ FBUtilities.getAvailableProcessors() * 10));
+
+ @SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
+ public static PartitionIterator partitions(PartitionRangeReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ long queryStartNanoTime)
+ {
+ // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
+ RangeCommandIterator rangeCommands = rangeCommandIterator(command, consistencyLevel, queryStartNanoTime);
+ return command.limits().filter(command.postReconciliationProcessing(rangeCommands),
+ command.nowInSec(),
+ command.selectsFullPartition(),
+ command.metadata().enforceStrictLiveness());
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("resource") // created iterators will be closed in CQL layer through the chain of transformations
+ static RangeCommandIterator rangeCommandIterator(PartitionRangeReadCommand command,
+ ConsistencyLevel consistencyLevel,
+ long queryStartNanoTime)
+ {
+ Tracing.trace("Computing ranges to query");
+
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ ReplicaPlanIterator replicaPlans = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, consistencyLevel);
+
+ // our estimate of how many result rows there will be per-range
+ float resultsPerRange = estimateResultsPerRange(command, keyspace);
+ // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
+ // fetch enough rows in the first round
+ resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+ int maxConcurrencyFactor = Math.min(replicaPlans.size(), MAX_CONCURRENT_RANGE_REQUESTS);
+ int concurrencyFactor = resultsPerRange == 0.0
+ ? 1
+ : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
+ logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
+ resultsPerRange, command.limits().count(), replicaPlans.size(), concurrencyFactor);
+ Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
+ replicaPlans.size(), concurrencyFactor, resultsPerRange);
+
+ ReplicaPlanMerger mergedReplicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, consistencyLevel);
+ return new RangeCommandIterator(mergedReplicaPlans,
+ command,
+ concurrencyFactor,
+ maxConcurrencyFactor,
+ replicaPlans.size(),
+ queryStartNanoTime);
+ }
+
+ /**
+ * Estimate the number of result rows per range in the ring based on our local data.
+ * <p>
+ * This assumes that ranges are uniformly distributed across the cluster and
+ * that the queried data is also uniformly distributed.
+ */
+ @VisibleForTesting
+ static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
+ {
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
+ Index index = command.getIndex(cfs);
+ float maxExpectedResults = index == null
+ ? command.limits().estimateTotalResults(cfs)
+ : index.getEstimatedResultRows();
+
+ // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
+ return (maxExpectedResults / DatabaseDescriptor.getNumTokens())
+ / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
new file mode 100644
index 0000000..ef88c9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
+{
+ private final Keyspace keyspace;
+ private final ConsistencyLevel consistency;
+ @VisibleForTesting
+ final Iterator<? extends AbstractBounds<PartitionPosition>> ranges;
+ private final int rangeCount;
+
+ ReplicaPlanIterator(AbstractBounds<PartitionPosition> keyRange, Keyspace keyspace, ConsistencyLevel consistency)
+ {
+ this.keyspace = keyspace;
+ this.consistency = consistency;
+
+ List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy
+ ? keyRange.unwrap()
+ : getRestrictedRanges(keyRange);
+ this.ranges = l.iterator();
+ this.rangeCount = l.size();
+ }
+
+ /**
+ * @return the number of {@link ReplicaPlan.ForRangeRead}s in this iterator
+ */
+ int size()
+ {
+ return rangeCount;
+ }
+
+ @Override
+ protected ReplicaPlan.ForRangeRead computeNext()
+ {
+ if (!ranges.hasNext())
+ return endOfData();
+
+ return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next(), 1);
+ }
+
+ /**
+ * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
+ * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
+ */
+ private static List<AbstractBounds<PartitionPosition>> getRestrictedRanges(final AbstractBounds<PartitionPosition> queryRange)
+ {
+ // special case for bounds containing exactly 1 (non-minimum) token
+ if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
+ {
+ return Collections.singletonList(queryRange);
+ }
+
+ TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
+
+ List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
+ // divide the queryRange into pieces delimited by the ring and minimum tokens
+ Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
+ AbstractBounds<PartitionPosition> remainder = queryRange;
+ while (ringIter.hasNext())
+ {
+ /*
+ * remainder is a range/bounds of partition positions and we want to split it with a token. We want to split
+ * using the key returned by token.maxKeyBound. For instance, if remainder is [DK(10, 'foo'), DK(20, 'bar')],
+ * and we have 3 nodes with tokens 0, 15, 30, we want to split remainder to A=[DK(10, 'foo'), 15] and
+ * B=(15, DK(20, 'bar')]. But since we can't mix tokens and keys at the same time in a range, we use
+ * 15.maxKeyBound() to have A include all keys having 15 as token and B include none of those (since that is
+ * what our node owns).
+ */
+ Token upperBoundToken = ringIter.next();
+ PartitionPosition upperBound = upperBoundToken.maxKeyBound();
+ if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
+ // no more splits
+ break;
+ Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> splits = remainder.split(upperBound);
+ if (splits == null)
+ continue;
+
+ ranges.add(splits.left);
+ remainder = splits.right;
+ }
+ ranges.add(remainder);
+
+ return ranges;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java
new file mode 100644
index 0000000..f2dacd0
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanMerger.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.utils.AbstractIterator;
+
+class ReplicaPlanMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
+{
+ private final Keyspace keyspace;
+ private final ConsistencyLevel consistency;
+ private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
+
+ ReplicaPlanMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+ {
+ this.keyspace = keyspace;
+ this.consistency = consistency;
+ this.ranges = Iterators.peekingIterator(iterator);
+ }
+
+ @Override
+ protected ReplicaPlan.ForRangeRead computeNext()
+ {
+ if (!ranges.hasNext())
+ return endOfData();
+
+ ReplicaPlan.ForRangeRead current = ranges.next();
+
+ // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+ // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+ // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+ while (ranges.hasNext())
+ {
+ // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
+ // don't know how to deal with a wrapping range.
+ // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
+ // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
+ // wire compatibility, so it's likely easier not to bother;
+ if (current.range().right.isMinimum())
+ break;
+
+ ReplicaPlan.ForRangeRead next = ranges.peek();
+ ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+ if (merged == null)
+ break;
+
+ current = merged;
+ ranges.next(); // consume the range we just merged since we've only peeked so far
+ }
+ return current;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java b/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java
new file mode 100644
index 0000000..d318b41
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/range/SingleRangeResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.utils.AbstractIterator;
+
+class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
+{
+ private final DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver;
+ private final ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler;
+ private final ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair;
+
+ private PartitionIterator result;
+
+ SingleRangeResponse(DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver,
+ ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler,
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair)
+ {
+ this.resolver = resolver;
+ this.handler = handler;
+ this.readRepair = readRepair;
+ }
+
+ ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> getReadRepair()
+ {
+ return readRepair;
+ }
+
+ private void waitForResponse() throws ReadTimeoutException
+ {
+ if (result != null)
+ return;
+
+ handler.awaitResults();
+ result = resolver.resolve();
+ }
+
+ @Override
+ protected RowIterator computeNext()
+ {
+ waitForResponse();
+ return result.hasNext() ? result.next() : endOfData();
+ }
+
+ @Override
+ public void close()
+ {
+ if (result != null)
+ result.close();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 09a98df..babc2c3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -477,6 +476,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
return instances.get(node - 1).coordinator();
}
+ public Stream<ICoordinator> coordinators()
+ {
+ return stream().map(IInstance::coordinator);
+ }
+
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
diff --git a/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java b/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java
new file mode 100644
index 0000000..392f325
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/LargeMessageTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+
+public class LargeMessageTest extends TestBaseImpl
+{
+ @Test
+ public void testLargeMessage() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(2).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))"));
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < LARGE_MESSAGE_THRESHOLD; i++)
+ builder.append('a');
+ String s = builder.toString();
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, ?)"), ALL, s);
+ assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ALL, 1),
+ row(1, 1, s));
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java b/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java
new file mode 100644
index 0000000..a742e48
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/MetricsCountQueriesTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class MetricsCountQueriesTest extends TestBaseImpl
+{
+ @Test
+ public void testMetricsCountQueries() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"), ALL, i, i, i);
+
+ long readCount1 = readCount(cluster.get(1));
+ long readCount2 = readCount(cluster.get(2));
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ? and ck = ?"), ALL, i, i);
+
+ readCount1 = readCount(cluster.get(1)) - readCount1;
+ readCount2 = readCount(cluster.get(2)) - readCount2;
+ Assert.assertEquals(readCount1, readCount2);
+ Assert.assertEquals(100, readCount1);
+ }
+ }
+
+ private static long readCount(IInvokableInstance instance)
+ {
+ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java b/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java
new file mode 100644
index 0000000..c0d8fe3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/PagingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class PagingTest extends TestBaseImpl
+{
+ @Test
+ public void testPaging() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(3).start());
+ Cluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+ singleNode.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
+
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ cluster.coordinator(1)
+ .execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, ?, ?)"), QUORUM, i, j, i + i);
+ singleNode.coordinator(1)
+ .execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, ?, ?)"), QUORUM, i, j, i + i);
+ }
+ }
+
+ int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50, Integer.MAX_VALUE };
+ String[] statements = new String[]{ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 LIMIT 3"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 LIMIT 2"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2"),
+ withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2"),
+ withKeyspace("SELECT DISTINCT pk FROM %s.tbl LIMIT 3"),
+ withKeyspace("SELECT DISTINCT pk FROM %s.tbl WHERE pk IN (3,5,8,10)"),
+ withKeyspace("SELECT DISTINCT pk FROM %s.tbl WHERE pk IN (3,5,8,10) LIMIT 2")
+ };
+ for (String statement : statements)
+ {
+ Object[][] noPagingRows = singleNode.coordinator(1).execute(statement, QUORUM);
+ for (int pageSize : pageSizes)
+ {
+ Iterator<Object[]> pagingRows = cluster.coordinator(1).executeWithPaging(statement, QUORUM, pageSize);
+ assertRows(Iterators.toArray(pagingRows, Object[].class), noPagingRows);
+ }
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
index a8fb978..bcc77e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairEmptyRangeTombstonesTest.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.apache.cassandra.distributed.test.ReadRepairTester.BOOLEANS;
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
index dfdb6c2..b79e0c4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTypesTest.java
@@ -36,7 +36,6 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
-import static org.apache.cassandra.distributed.test.ReadRepairTester.BOOLEANS;
import static org.apache.cassandra.service.reads.repair.ReadRepairStrategy.NONE;
/**
@@ -1190,9 +1189,4 @@ public class ReadRepairQueryTypesTest extends TestBaseImpl
schemaChange("DROP TABLE " + qualifiedTableName);
}
}
-
- private static Object[][] rows(Object[]... rows)
- {
- return rows;
- }
}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
index 9af884e..c3a36cb 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTester.java
@@ -41,9 +41,6 @@ import static org.apache.cassandra.distributed.test.TestBaseImpl.KEYSPACE;
*/
public abstract class ReadRepairTester<T extends ReadRepairTester<T>>
{
- static final Object[][] EMPTY_ROWS = new Object[0][];
- static final boolean[] BOOLEANS = new boolean[]{ false, true };
-
private static final AtomicInteger seqNumber = new AtomicInteger();
private final String tableName = "t_" + seqNumber.getAndIncrement();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java
new file mode 100644
index 0000000..84b90c7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SSTableSkippingReadTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertEquals;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class SSTableSkippingReadTest extends TestBaseImpl
+{
+ @Test
+ public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2"));
+ cluster.get(1).flush(KEYSPACE);
+ // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
+ cluster.get(1).runOnInstance(() -> {
+ Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+ .getColumnFamilyStore("tbl")
+ .getLiveSSTables();
+ assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
+ long minTimestamp = sstables.iterator().next().getMinTimestamp();
+ assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
+ });
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+
+ Object[][] rows = cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+ assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+ }
+ }
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1"));
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
+ // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
+ // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"));
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[]{ 0, 6, 6 });
+ }
+ }
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
+ {
+ // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))"));
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 1 WHERE pk = 0"));
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3"));
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
+ // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
+ // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the
+ // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2"));
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0"));
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=0 AND ck > 5"), ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[]{ 0, 6, 6 });
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java
new file mode 100644
index 0000000..b84ef73
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaDisagreementTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.fail;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+
+public class SchemaDisagreementTest extends TestBaseImpl
+{
+ /**
+ * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+ */
+ @Test
+ public void writeWithSchemaDisagreement() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+ // Introduce schema disagreement
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)"), ALL);
+ fail("Should have failed because of schema disagreement.");
+ }
+ catch (Exception e)
+ {
+ // for some reason, we get weird errors when trying to check class directly
+ // I suppose it has to do with some classloader manipulation going on
+ Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
+ // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
+ Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
+ (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
+ }
+ }
+ }
+
+ /**
+ * If a node receives a mutation for a column it knows has been dropped, the write should succeed.
+ */
+ @Test
+ public void writeWithSchemaDisagreement2() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"));
+
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+ cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"));
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+
+ // Introduce schema disagreement
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl DROP v2"), 1);
+
+ // execute a write including the dropped column where the coordinator is not yet aware of the drop
+ // all nodes should process this without error
+ cluster.coordinator(2).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)"), ALL);
+ // and flushing should also be fine
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ // the results of reads will vary depending on whether the coordinator has seen the schema change
+ // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
+ assertRows(cluster.coordinator(2).execute(withKeyspace("SELECT * FROM %s.tbl"), ALL),
+ rows(row(1, 1, 1, 1), row(2, 2, 2, 2)));
+ assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl"), ALL),
+ rows(row(1, 1, 1), row(2, 2, 2)));
+ }
+ }
+
+ /**
+ * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed.
+ */
+ @Test
+ public void writeWithInconsequentialSchemaDisagreement() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+ // Introduce schema disagreement
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+ // this write shouldn't cause any problems because it doesn't write to the new column
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (2, 2, 2)"), ALL);
+ }
+ }
+
+ /**
+ * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for
+ * that column.
+ */
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"));
+
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(2).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+ cluster.get(3).executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck, v1) VALUES (1, 1, 1)"));
+
+ // Introduce schema disagreement
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl ADD v2 int"), 1);
+
+ assertRows(cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = 1"), ALL),
+ new Object[][]{ new Object[]{ 1, 1, 1, null } });
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 23b5c14..69b074f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -18,46 +18,305 @@
package org.apache.cassandra.distributed.test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.collect.Iterators.toArray;
import static java.lang.String.format;
-import static java.util.Arrays.asList;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
/**
* Tests short read protection, the mechanism that ensures distributed queries at read consistency levels > ONE/LOCAL_ONE
* avoid short reads that might happen when a limit is used and reconciliation accepts less rows than such limit.
*/
+@RunWith(Parameterized.class)
public class ShortReadProtectionTest extends TestBaseImpl
{
+ private static final int NUM_NODES = 3;
+ private static final int[] PAGE_SIZES = new int[]{ 1, 10 };
+
+ private static Cluster cluster;
+ private Tester tester;
+
+ /**
+ * The consistency level to be used in reads. Internal writes will hit the minimum number of replicas to match this
+ * consisstency level. With RF=3, this means one witten replica for CL=ALL, and two for CL=QUORUM.
+ */
+ @Parameterized.Parameter
+ public ConsistencyLevel readConsistencyLevel;
+
+ /**
+ * Whether to flush data after mutations.
+ */
+ @Parameterized.Parameter(1)
+ public boolean flush;
+
+ /**
+ * Whether paging is used for the distributed queries.
+ */
+ @Parameterized.Parameter(2)
+ public boolean paging;
+
+ @Parameterized.Parameters(name = "{index}: read_cl={0} flush={1} paging={2}")
+ public static Collection<Object[]> data()
+ {
+ List<Object[]> result = new ArrayList<>();
+ for (ConsistencyLevel readConsistencyLevel : Arrays.asList(ALL, QUORUM))
+ for (boolean flush : BOOLEANS)
+ for (boolean paging : BOOLEANS)
+ result.add(new Object[]{ readConsistencyLevel, flush, paging });
+ return result;
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ cluster = init(Cluster.build()
+ .withNodes(NUM_NODES)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false))
+ .start());
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Before
+ public void setupTester()
+ {
+ tester = new Tester(readConsistencyLevel, flush, paging);
+ }
+
+ @After
+ public void teardownTester()
+ {
+ tester.dropTable();
+ }
+
+ /**
+ * Tests SRP for tables with no clustering columns and with a deleted row.
+ * <p>
+ * See CASSANDRA-13880.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13880()}.
+ */
+ @Test
+ public void testSkinnyTableWithoutLiveRows()
+ {
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+ .allNodes("INSERT INTO %s (id) VALUES (0) USING TIMESTAMP 0")
+ .toNode1("DELETE FROM %s WHERE id = 0")
+ .assertRows("SELECT DISTINCT id FROM %s WHERE id = 0")
+ .assertRows("SELECT id FROM %s WHERE id = 0 LIMIT 1");
+ }
+
+ /**
+ * Tests SRP for tables with no clustering columns and with alternated live and deleted rows.
+ * <p>
+ * See CASSANDRA-13747.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13747()}.
+ */
+ @Test
+ public void testSkinnyTableWithLiveRows()
+ {
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+ .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
+ .toNode1("DELETE FROM %s WHERE id IN (1, 0, 4, 6, 3)") // delete every other row
+ .assertRows("SELECT DISTINCT token(id), id FROM %s",
+ row(token(5), 5), row(token(8), 8), row(token(2), 2), row(token(7), 7), row(token(9), 9));
+ }
+
+ /**
+ * Tests SRP for tables with no clustering columns and with complementary deleted rows.
+ * <p>
+ * See CASSANDRA-13595.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13595()}.
+ */
+ @Test
+ public void testSkinnyTableWithComplementaryDeletions()
+ {
+ tester.createTable("CREATE TABLE %s (id int PRIMARY KEY)")
+ .allNodes(0, 10, i -> format("INSERT INTO %%s (id) VALUES (%d) USING TIMESTAMP 0", i)) // order is 5,1,8,0,2,4,7,6,9,3
+ .toNode1("DELETE FROM %s WHERE id IN (5, 8, 2, 7, 9)") // delete every other row
+ .toNode2("DELETE FROM %s WHERE id IN (1, 0, 4, 6)") // delete every other row but the last one
+ .assertRows("SELECT id FROM %s LIMIT 1", row(3))
+ .assertRows("SELECT DISTINCT id FROM %s LIMIT 1", row(3));
+ }
+
+ /**
+ * Tests SRP when more than one row is missing.
+ * <p>
+ * See CASSANDRA-12872.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_12872()}.
+ */
+ @Test
+ public void testMultipleMissedRows()
+ {
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+ .allNodes(0, 4, i -> format("INSERT INTO %%s (pk, ck) VALUES (0, %d) USING TIMESTAMP 0", i))
+ .toNode1("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2, 3)",
+ "INSERT INTO %s (pk, ck) VALUES (0, 5)")
+ .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 4)")
+ .assertRows("SELECT ck FROM %s WHERE pk = 0 LIMIT 2", row(0), row(4));
+ }
+
+ /**
+ * Tests SRP with deleted rows at the beginning of the partition and ascending order.
+ * <p>
+ * See CASSANDRA-9460.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
+ * {@link #testDescendingOrder()}.
+ */
+ @Test
+ public void testAscendingOrder()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+ .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
+ .toNode1("DELETE FROM %s WHERE k=0 AND c=1")
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+ .toNode3("DELETE FROM %s WHERE k=0 AND c=3")
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 1", row(4, 40))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 2", row(4, 40), row(5, 50))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 3", row(4, 40), row(5, 50), row(6, 60))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c ASC LIMIT 4", row(4, 40), row(5, 50), row(6, 60), row(7, 70));
+ }
+
+ /**
+ * Tests SRP behaviour with deleted rows at the end of the partition and descending order.
+ * <p>
+ * See CASSANDRA-9460.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read()} together with
+ * {@link #testAscendingOrder()}.
+ */
+ @Test
+ public void testDescendingOrder()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+ .allNodes(1, 10, i -> format("INSERT INTO %%s (k, c, v) VALUES (0, %d, %d) USING TIMESTAMP 0", i, i * 10))
+ .toNode1("DELETE FROM %s WHERE k=0 AND c=7")
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=8")
+ .toNode3("DELETE FROM %s WHERE k=0 AND c=9")
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 1", row(6, 60))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 2", row(6, 60), row(5, 50))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 3", row(6, 60), row(5, 50), row(4, 40))
+ .assertRows("SELECT c, v FROM %s WHERE k=0 ORDER BY c DESC LIMIT 4", row(6, 60), row(5, 50), row(4, 40), row(3, 30));
+ }
+
+ /**
+ * Test short reads ultimately leaving no rows alive after a partition deletion.
+ * <p>
+ * See CASSANDRA-4000 and CASSANDRA-8933.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_short_read_delete()} and
+ * {@code consistency_test.py:TestConsistency.test_short_read_quorum_delete()}. Note that the
+ * {@link #readConsistencyLevel} test parameter ensures that both tests are covered.
+ */
+ @Test
+ public void testDeletePartition()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+ .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
+ "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+ .toNode2("DELETE FROM %s WHERE k=0")
+ .assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
+ }
+
+ /**
+ * Test short reads ultimately leaving no rows alive after a partition deletion when there is a static row.
+ */
+ @Test
+ public void testDeletePartitionWithStatic()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
+ .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
+ "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+ .toNode2("DELETE FROM %s WHERE k=0")
+ .assertRows("SELECT c, v FROM %s WHERE k=0 LIMIT 1");
+ }
+
+ /**
+ * Test short reads ultimately leaving no rows alive after a clustering deletion.
+ */
+ @Test
+ public void testDeleteClustering()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))")
+ .allNodes("INSERT INTO %s (k, c, v) VALUES (0, 1, 10) USING TIMESTAMP 0",
+ "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
+ .assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20))
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+ .assertRows("SELECT * FROM %s WHERE k=0 LIMIT 1");
+ }
+
+ /**
+ * Test short reads ultimately leaving no rows alive after a clustering deletion when there is a static row.
+ */
+ @Test
+ public void testDeleteClusteringWithStatic()
+ {
+ tester.createTable("CREATE TABLE %s (k int, c int, v int, s int STATIC, PRIMARY KEY(k, c))")
+ .allNodes("INSERT INTO %s (k, c, v, s) VALUES (0, 1, 10, 100) USING TIMESTAMP 0",
+ "INSERT INTO %s (k, c, v) VALUES (0, 2, 20) USING TIMESTAMP 0")
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=1")
+ .assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, 2, 20, 100))
+ .toNode2("DELETE FROM %s WHERE k=0 AND c=2")
+ .assertRows("SELECT k, c, v, s FROM %s WHERE k=0 LIMIT 1", row(0, null, null, 100));
+ }
+
/**
* Test GROUP BY with short read protection, particularly when there is a limit and regular row deletions.
* <p>
* See CASSANDRA-15459
*/
@Test
- public void testGroupBySRPRegularRow() throws Throwable
+ public void testGroupByRegularRow()
{
- testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))",
- asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
- "DELETE FROM %s WHERE pk=0 AND ck=0",
- "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0"),
- asList("DELETE FROM %s WHERE pk=1 AND ck=1",
- "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
- "DELETE FROM %s WHERE pk=2 AND ck=2"),
- asList("SELECT * FROM %s LIMIT 1",
- "SELECT * FROM %s LIMIT 10",
- "SELECT * FROM %s GROUP BY pk LIMIT 1",
- "SELECT * FROM %s GROUP BY pk LIMIT 10",
- "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
- "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+ .toNode1("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
+ "DELETE FROM %s WHERE pk=0 AND ck=0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
+ .toNode2("DELETE FROM %s WHERE pk=1 AND ck=1",
+ "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+ "DELETE FROM %s WHERE pk=2 AND ck=2")
+ .assertRows("SELECT * FROM %s LIMIT 1")
+ .assertRows("SELECT * FROM %s LIMIT 10")
+ .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
+ .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
+ .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
+ .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
}
/**
@@ -66,51 +325,223 @@ public class ShortReadProtectionTest extends TestBaseImpl
* See CASSANDRA-15459
*/
@Test
- public void testGroupBySRPStaticRow() throws Throwable
- {
- testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))",
- asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
- "INSERT INTO %s (pk, s) VALUES (0, null)",
- "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0"),
- asList("INSERT INTO %s (pk, s) VALUES (1, null)",
- "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
- "INSERT INTO %s (pk, s) VALUES (2, null)"),
- asList("SELECT * FROM %s LIMIT 1",
- "SELECT * FROM %s LIMIT 10",
- "SELECT * FROM %s GROUP BY pk LIMIT 1",
- "SELECT * FROM %s GROUP BY pk LIMIT 10",
- "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
- "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
- }
-
- private void testGroupBySRP(String createTable,
- List<String> node1Queries,
- List<String> node2Queries,
- List<String> coordinatorQueries) throws Throwable
- {
- try (Cluster cluster = init(Cluster.build()
- .withNodes(2)
- .withConfig(config -> config.set("hinted_handoff_enabled", false))
- .start()))
+ public void testGroupByStaticRow()
+ {
+ tester.createTable("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))")
+ .toNode1("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, s) VALUES (0, null)",
+ "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0")
+ .toNode2("INSERT INTO %s (pk, s) VALUES (1, null)",
+ "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, s) VALUES (2, null)")
+ .assertRows("SELECT * FROM %s LIMIT 1")
+ .assertRows("SELECT * FROM %s LIMIT 10")
+ .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 1")
+ .assertRows("SELECT * FROM %s GROUP BY pk LIMIT 10")
+ .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 1")
+ .assertRows("SELECT * FROM %s GROUP BY pk, ck LIMIT 10");
+ }
+
+ /**
+ * See CASSANDRA-13911.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911()}.
+ */
+ @Test
+ public void testSkipEarlyTermination()
+ {
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+ .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0)")
+ .toNode2("DELETE FROM %s WHERE pk = 0 AND ck IN (1, 2)")
+ .assertRows("SELECT DISTINCT pk FROM %s", row(0));
+ }
+
+ /**
+ * A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDoneForPartition()} to
+ * abort single partition SRP early if a per partition limit is set.
+ * <p>
+ * See CASSANDRA-13911.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_rows_srp()}.
+ */
+ @Test
+ public void testSkipEarlyTerminationRows()
+ {
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+ .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
+ "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 1")
+ .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
+ "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck = 0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0")
+ .assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 3", row(0, 0), row(0, 1), row(2, 2));
+ }
+
+ /**
+ * A regression test to prove that we can no longer rely on {@code !singleResultCounter.isDone()} to abort ranged
+ * partition SRP early if a per partition limit is set.
+ * <p>
+ * See CASSANDRA-13911.
+ * <p>
+ * Replaces Python dtest {@code consistency_test.py:TestConsistency.test_13911_partitions_srp()}.
+ */
+ @Test
+ public void testSkipEarlyTerminationPartitions()
+ {
+ tester.createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))")
+ .toNode1("INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (0, 1) USING TIMESTAMP 0",
+ "DELETE FROM %s USING TIMESTAMP 42 WHERE pk = 2 AND ck IN (0, 1)")
+ .toNode2("INSERT INTO %s (pk, ck) VALUES (0, 2) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (0, 3) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 0) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 1) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (4, 0) USING TIMESTAMP 0",
+ "INSERT INTO %s (pk, ck) VALUES (4, 1) USING TIMESTAMP 0")
+ .assertRows("SELECT pk, ck FROM %s PER PARTITION LIMIT 2 LIMIT 4",
+ row(0, 0), row(0, 1), row(4, 0), row(4, 1));
+ }
+
+ private static long token(int key)
+ {
+ return (long) Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key)).getTokenValue();
+ }
+
+ private static class Tester
+ {
+ private static final AtomicInteger seqNumber = new AtomicInteger();
+
+ private final ConsistencyLevel readConsistencyLevel;
+ private final boolean flush, paging;
+ private final String qualifiedTableName;
+
+ private boolean flushed = false;
+
+ private Tester(ConsistencyLevel readConsistencyLevel, boolean flush, boolean paging)
+ {
+ this.readConsistencyLevel = readConsistencyLevel;
+ this.flush = flush;
+ this.paging = paging;
+ qualifiedTableName = KEYSPACE + ".t_" + seqNumber.getAndIncrement();
+
+ assert readConsistencyLevel == ALL || readConsistencyLevel == QUORUM
+ : "Only ALL and QUORUM consistency levels are supported";
+ }
+
+ private Tester createTable(String query)
+ {
+ cluster.schemaChange(format(query) + " WITH read_repair='NONE'");
+ return this;
+ }
+
+ private Tester allNodes(int startInclusive, int endExclusive, Function<Integer, String> querySupplier)
+ {
+ IntStream.range(startInclusive, endExclusive).mapToObj(querySupplier::apply).forEach(this::allNodes);
+ return this;
+ }
+
+ private Tester allNodes(String... queries)
+ {
+ for (String query : queries)
+ allNodes(query);
+ return this;
+ }
+
+ private Tester allNodes(String query)
+ {
+ cluster.coordinator(1).execute(format(query), ALL);
+ return this;
+ }
+
+ /**
+ * Internally runs the specified write queries in the first node. If the {@link #readConsistencyLevel} is
+ * QUORUM, then the write will also be internally done in the second replica, to simulate a QUORUM write.
+ */
+ private Tester toNode1(String... queries)
+ {
+ return toNode(1, queries);
+ }
+
+ /**
+ * Internally runs the specified write queries in the second node. If the {@link #readConsistencyLevel} is
+ * QUORUM, then the write will also be internally done in the third replica, to simulate a QUORUM write.
+ */
+ private Tester toNode2(String... queries)
+ {
+ return toNode(2, queries);
+ }
+
+ /**
+ * Internally runs the specified write queries in the third node. If the {@link #readConsistencyLevel} is
+ * QUORUM, then the write will also be internally done in the first replica, to simulate a QUORUM write.
+ */
+ private Tester toNode3(String... queries)
+ {
+ return toNode(3, queries);
+ }
+
+ /**
+ * Internally runs the specified write queries in the specified node. If the {@link #readConsistencyLevel} is
+ * QUORUM the write will also be internally done in the next replica in the ring, to simulate a QUORUM write.
+ */
+ private Tester toNode(int node, String... queries)
+ {
+ IInvokableInstance replica = cluster.get(node);
+ IInvokableInstance nextReplica = readConsistencyLevel == QUORUM
+ ? cluster.get(node == NUM_NODES ? 1 : node + 1)
+ : null;
+
+ for (String query : queries)
+ {
+ String formattedQuery = format(query);
+ replica.executeInternal(formattedQuery);
+
+ if (nextReplica != null)
+ nextReplica.executeInternal(formattedQuery);
+ }
+
+ return this;
+ }
+
+ private Tester assertRows(String query, Object[]... expectedRows)
+ {
+ if (flush && !flushed)
+ {
+ cluster.stream().forEach(n -> n.flush(KEYSPACE));
+ flushed = true;
+ }
+
+ String formattedQuery = format(query);
+ cluster.coordinators().forEach(coordinator -> {
+ if (paging)
+ {
+ for (int fetchSize : PAGE_SIZES)
+ {
+ Iterator<Object[]> actualRows = coordinator.executeWithPaging(formattedQuery, readConsistencyLevel, fetchSize);
+ AssertUtils.assertRows(toArray(actualRows, Object[].class), expectedRows);
+ }
+ }
+ else
+ {
+ Object[][] actualRows = coordinator.execute(formattedQuery, readConsistencyLevel);
+ AssertUtils.assertRows(actualRows, expectedRows);
+ }
+ });
+
+ return this;
+ }
+
+ private String format(String query)
+ {
+ return String.format(query, qualifiedTableName);
+ }
+
+ private void dropTable()
{
- // create the table with read repair disabled
- String table = withKeyspace("%s.t");
- cluster.schemaChange(format(createTable + " WITH read_repair='NONE'", table));
-
- // populate data on node1
- IInvokableInstance node1 = cluster.get(1);
- for (String query : node1Queries)
- node1.executeInternal(format(query, table));
-
- // populate data on node2
- IInvokableInstance node2 = cluster.get(2);
- for (String query : node2Queries)
- node2.executeInternal(format(query, table));
-
- // verify the behaviour of SRP with GROUP BY queries
- ICoordinator coordinator = cluster.coordinator(1);
- for (String query : coordinatorQueries)
- assertRows(coordinator.execute(format(query, table), ALL));
+ cluster.schemaChange(format("DROP TABLE IF EXISTS %s"));
}
}
}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index fab3477..7278121 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -18,445 +18,315 @@
package org.apache.cassandra.distributed.test;
-import java.util.Set;
-
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-import static org.apache.cassandra.distributed.shared.AssertUtils.*;
-import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
-import static org.junit.Assert.fail;
-
-// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.NoopCompressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.compress.ZstdCompressor;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRow;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Simple read/write tests using different types of query, paticularly when the data is spread across memory and
+ * multiple sstables and using different compressors. All available compressors are tested. Both ascending and
+ * descending clustering orders are tested. The read queries are run using every node as a coordinator, with and without
+ * paging.
+ */
+@RunWith(Parameterized.class)
public class SimpleReadWriteTest extends TestBaseImpl
{
- @Test
- public void coordinatorReadTest() throws Throwable
+ private static final int NUM_NODES = 4;
+ private static final int REPLICATION_FACTOR = 3;
+ private static final String CREATE_TABLE = "CREATE TABLE %s(k int, c int, v int, PRIMARY KEY (k, c)) " +
+ "WITH CLUSTERING ORDER BY (c %s) " +
+ "AND COMPRESSION = { 'class': '%s' } " +
+ "AND READ_REPAIR = 'none'";
+ private static final String[] COMPRESSORS = new String[]{ NoopCompressor.class.getSimpleName(),
+ LZ4Compressor.class.getSimpleName(),
+ DeflateCompressor.class.getSimpleName(),
+ SnappyCompressor.class.getSimpleName(),
+ ZstdCompressor.class.getSimpleName() };
+ private static final int SECOND_SSTABLE_INTERVAL = 2;
+ private static final int MEMTABLE_INTERVAL = 5;
+
+ private static final AtomicInteger seq = new AtomicInteger();
+
+ /**
+ * The sstable compressor to be used.
+ */
+ @Parameterized.Parameter
+ public String compressor;
+
+ /**
+ * Whether the clustering order is reverse.
+ */
+ @Parameterized.Parameter(1)
+ public boolean reverse;
+
+ private String tableName;
+
+ @Parameterized.Parameters(name = "{index}: compressor={0} reverse={1}")
+ public static Collection<Object[]> data()
{
- try (Cluster cluster = init(builder().withNodes(3).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- row(1, 1, 1),
- row(1, 2, 2),
- row(1, 3, 3));
- }
+ List<Object[]> result = new ArrayList<>();
+ for (String compressor : COMPRESSORS)
+ for (boolean reverse : BOOLEANS)
+ result.add(new Object[]{ compressor, reverse });
+ return result;
}
- @Test
- public void largeMessageTest() throws Throwable
+ private static Cluster cluster;
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
{
- try (Cluster cluster = init(builder().withNodes(2).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < LARGE_MESSAGE_THRESHOLD ; i++)
- builder.append('a');
- String s = builder.toString();
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
- ConsistencyLevel.ALL,
- s);
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- row(1, 1, s));
- }
+ cluster = init(Cluster.build(NUM_NODES).start(), REPLICATION_FACTOR);
}
- @Test
- public void coordinatorWriteTest() throws Throwable
+ @AfterClass
+ public static void teardownCluster()
{
- try (Cluster cluster = init(builder().withNodes(3).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
-
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
- ConsistencyLevel.QUORUM);
+ if (cluster != null)
+ cluster.close();
+ }
- for (int i = 0; i < 3; i++)
- {
- assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
- row(1, 1, 1));
- }
+ @Before
+ public void before()
+ {
+ // create the table
+ tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
+ cluster.schemaChange(String.format(CREATE_TABLE, tableName, reverse ? "DESC" : "ASC", compressor));
+ }
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
- ConsistencyLevel.QUORUM),
- row(1, 1, 1));
- }
+ @After
+ public void after()
+ {
+ cluster.schemaChange(withTable("DROP TABLE %s"));
}
/**
- * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+ * Simple put/get on a single partition with a few rows, reading with a single partition query.
+ * <p>
+ * Migrated from Python dtests putget_test.py:TestPutGet.test_putget[_snappy|_deflate]().
*/
@Test
- public void writeWithSchemaDisagreement() throws Throwable
+ public void testPartitionQuery()
{
- try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ int numRows = 10;
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ writeRows(1, numRows);
- try
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
- fail("Should have failed because of schema disagreement.");
- }
- catch (Exception e)
- {
- // for some reason, we get weird errors when trying to check class directly
- // I suppose it has to do with some classloader manipulation going on
- Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
- // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
- Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
- (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
-
- }
+ Object[][] rows = readRows("SELECT * FROM %s WHERE k=?", 0);
+ Assert.assertEquals(numRows, rows.length);
+ for (int c = 0; c < numRows; c++)
+ {
+ validateRow(rows[c], numRows, 0, c);
}
}
/**
- * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+ * Simple put/get on multiple partitions with multiple rows, reading with a range query.
+ * <p>
+ * Migrated from Python dtests putget_test.py:TestPutGet.test_rangeputget().
*/
@Test
- public void writeWithSchemaDisagreement2() throws Throwable
+ public void testRangeQuery()
{
- try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ int numPartitions = 10;
+ int rowsPerPartition = 10;
+
+ writeRows(numPartitions, rowsPerPartition);
+
+ Object[][] rows = readRows("SELECT * FROM %s");
+ Assert.assertEquals(numPartitions * rowsPerPartition, rows.length);
+ for (int k = 0; k < numPartitions; k++)
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.forEach((instance) -> instance.flush(KEYSPACE));
-
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
-
- // execute a write including the dropped column where the coordinator is not yet aware of the drop
- // all nodes should process this without error
- cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
- // and flushing should also be fine
- cluster.forEach((instance) -> instance.flush(KEYSPACE));
- // the results of reads will vary depending on whether the coordinator has seen the schema change
- // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
- assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1,1), row(2,2,2,2)));
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1), row(2,2,2)));
+ for (int c = 0; c < rowsPerPartition; c++)
+ {
+ Object[] row = rows[k * rowsPerPartition + c];
+ validateRow(row, rowsPerPartition, k, c);
+ }
}
}
/**
- * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+ * Simple put/get on a single partition with multiple rows, reading with slice queries.
+ * <p>
+ * Migrated from Python dtests putget_test.py:TestPutGet.test_wide_row().
*/
@Test
- public void writeWithInconsequentialSchemaDisagreement() throws Throwable
+ public void testSliceQuery()
{
- try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+ int numRows = 100;
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ writeRows(1, numRows);
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ String query = "SELECT * FROM %s WHERE k=? AND c>=? AND c<?";
+ for (int sliceSize : Arrays.asList(10, 20, 100))
+ {
+ for (int c = 0; c < numRows; c = c + sliceSize)
+ {
+ Object[][] rows = readRows(query, 0, c, c + sliceSize);
+ Assert.assertEquals(sliceSize, rows.length);
- // this write shouldn't cause any problems because it doesn't write to the new column
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
- ConsistencyLevel.ALL);
+ for (int i = 0; i < sliceSize; i++)
+ {
+ Object[] row = rows[i];
+ validateRow(row, numRows, 0, c + i);
+ }
+ }
}
}
/**
- * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+ * Simple put/get on multiple partitions with multiple rows, reading with IN queries.
*/
@Test
- public void readWithSchemaDisagreement() throws Throwable
+ public void testInQuery()
{
- try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
-
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ int numPartitions = 10;
+ int rowsPerPartition = 10;
- Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
- }
- }
+ writeRows(numPartitions, rowsPerPartition);
- @Test
- public void simplePagedReadsTest() throws Throwable
- {
- try (Cluster cluster = init(builder().withNodes(3).start()))
+ String query = "SELECT * FROM %s WHERE k IN (?, ?)";
+ for (int k = 0; k < numPartitions; k += 2)
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- int size = 100;
- Object[][] results = new Object[size][];
- for (int i = 0; i < size; i++)
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, i);
- results[i] = new Object[] { 1, i, i};
- }
+ Object[][] rows = readRows(query, k, k + 1);
+ Assert.assertEquals(rowsPerPartition * 2, rows.length);
- // Make sure paged read returns same results with different page sizes
- for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+ for (int i = 0; i < 2; i++)
{
- assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
- ConsistencyLevel.QUORUM,
- pageSize),
- results);
+ for (int c = 0; c < rowsPerPartition; c++)
+ {
+ Object[] row = rows[i * rowsPerPartition + c];
+ validateRow(row, rowsPerPartition, k + i, c);
+ }
}
}
}
- @Test
- public void pagingWithRepairTest() throws Throwable
+ /**
+ * Writes {@code numPartitions} with {@code rowsPerPartition} each, with overrides in different sstables and memtables.
+ */
+ private void writeRows(int numPartitions, int rowsPerPartition)
{
- try (Cluster cluster = init(builder().withNodes(3).start()))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- int size = 100;
- Object[][] results = new Object[size][];
- for (int i = 0; i < size; i++)
- {
- // Make sure that data lands on different nodes and not coordinator
- cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- i, i);
-
- results[i] = new Object[] { 1, i, i};
- }
-
- // Make sure paged read returns same results with different page sizes
- for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
- {
- assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
- ConsistencyLevel.ALL,
- pageSize),
- results);
- }
-
- assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
- results);
- }
+ String update = withTable("UPDATE %s SET v=? WHERE k=? AND c=?");
+ ICoordinator coordinator = cluster.coordinator(1);
+
+ // insert all the partition rows in a single sstable
+ for (int c = 0; c < rowsPerPartition; c++)
+ for (int k = 0; k < numPartitions; k++)
+ coordinator.execute(update, QUORUM, c, k, c);
+ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // override some rows in a second sstable
+ for (int c = 0; c < rowsPerPartition; c += SECOND_SSTABLE_INTERVAL)
+ for (int k = 0; k < numPartitions; k++)
+ coordinator.execute(update, QUORUM, c + rowsPerPartition, k, c);
+ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // override some rows only in memtable
+ for (int c = 0; c < rowsPerPartition; c += MEMTABLE_INTERVAL)
+ for (int k = 0; k < numPartitions; k++)
+ coordinator.execute(update, QUORUM, c + rowsPerPartition * 2, k, c);
}
- @Test
- public void pagingTests() throws Throwable
+ /**
+ * Runs the specified query in all coordinators, with and without paging.
+ */
+ private Object[][] readRows(String query, Object... boundValues)
{
- try (Cluster cluster = init(builder().withNodes(3).start());
- Cluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+ query = withTable(query);
+
+ // verify that all coordinators return the same results for the query, regardless of paging
+ Object[][] lastRows = null;
+ int lastNode = 1;
+ boolean lastPaging = false;
+ for (int node = 1; node <= NUM_NODES; node++)
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ ICoordinator coordinator = cluster.coordinator(node);
- for (int i = 0; i < 10; i++)
+ for (boolean paging : BOOLEANS)
{
- for (int j = 0; j < 10; j++)
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, j, i + i);
- singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, j, i + i);
- }
- }
+ Object[][] rows = paging
+ ? Iterators.toArray(coordinator.executeWithPaging(query, QUORUM, 1, boundValues),
+ Object[].class)
+ : coordinator.execute(query, QUORUM, boundValues);
- int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
- String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
- };
- for (String statement : statements)
- {
- for (int pageSize : pageSizes)
+ if (lastRows != null)
{
- assertRows(cluster.coordinator(1)
- .executeWithPaging(statement,
- ConsistencyLevel.QUORUM, pageSize),
- singleNode.coordinator(1)
- .executeWithPaging(statement,
- ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
+ try
+ {
+ assertRows(lastRows, rows);
+ }
+ catch (AssertionError e)
+ {
+ fail(String.format("Node %d %s paging has returned different results " +
+ "for the same query than node %d %s paging:\n%s",
+ node, paging ? "with" : "without",
+ lastNode, lastPaging ? "with" : "without",
+ e.getMessage()));
+ }
}
- }
-
- }
- }
-
- @Test
- public void metricsCountQueriesTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(2)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- for (int i = 0; i < 100; i++)
- cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
-
- long readCount1 = readCount(cluster.get(1));
- long readCount2 = readCount(cluster.get(2));
- for (int i = 0; i < 100; i++)
- cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
-
- readCount1 = readCount(cluster.get(1)) - readCount1;
- readCount2 = readCount(cluster.get(2)) - readCount2;
- Assert.assertEquals(readCount1, readCount2);
- Assert.assertEquals(100, readCount1);
- }
- }
+ lastRows = rows;
+ lastPaging = paging;
+ }
- @Test
- public void skippedSSTableWithPartitionDeletionTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(2)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
- // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
- cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
- // and a row from a different partition, to provide the sstable's min/max clustering
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
- cluster.get(1).flush(KEYSPACE);
- // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
- cluster.get(1).runOnInstance(() -> {
- Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
- .getColumnFamilyStore("tbl")
- .getLiveSSTables();
- assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
- long minTimestamp = sstables.iterator().next().getMinTimestamp();
- assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
- });
-
- // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
-
- Object[][] rows = cluster.coordinator(1)
- .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
- ConsistencyLevel.ALL);
- assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+ lastNode = node;
}
- }
+ Assert.assertNotNull(lastRows);
- @Test
- public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(2)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
- // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
- cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
- // and a row from a different partition, to provide the sstable's min/max clustering
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
- cluster.get(1).flush(KEYSPACE);
- // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
- // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
- // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
- // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
- // know what data and/or tombstones are present on other nodes
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
- cluster.get(1).flush(KEYSPACE);
-
- // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
- Object[][] rows = cluster.coordinator(1)
- .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
- ConsistencyLevel.ALL);
- // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
- // node 1 (0, 6, 6) was not.
- assertRows(rows, new Object[] {0, 6 ,6});
- }
- }
+ // undo the clustering reverse sorting to ease validation
+ if (reverse)
+ ArrayUtils.reverse(lastRows);
- @Test
- public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
- {
- // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+ // sort by partition key to ease validation
+ Arrays.sort(lastRows, Comparator.comparing(row -> (int) row[0]));
- try (Cluster cluster = init(Cluster.create(2)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
- // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
- cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
- // and a row from a different partition, to provide the sstable's min/max clustering
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
- cluster.get(1).flush(KEYSPACE);
- // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
- // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
- // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
- // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the
- // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
- // know what data and/or tombstones are present on other nodes
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
- cluster.get(1).flush(KEYSPACE);
-
- // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
-
- Object[][] rows = cluster.coordinator(1)
- .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
- ConsistencyLevel.ALL);
- // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
- // node 1 (0, 6, 6) was not.
- assertRows(rows, new Object[] {0, 6 ,6});
- }
+ return lastRows;
}
- private long readCount(IInvokableInstance instance)
+ private void validateRow(Object[] row, int rowsPerPartition, int k, int c)
{
- return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
+ Assert.assertNotNull(row);
+
+ if (c % MEMTABLE_INTERVAL == 0)
+ assertRow(row, row(k, c, c + rowsPerPartition * 2));
+ else if (c % SECOND_SSTABLE_INTERVAL == 0)
+ assertRow(row, row(k, c, c + rowsPerPartition));
+ else
+ assertRow(row, row(k, c, c));
}
- private static Object[][] rows(Object[]...rows)
+ private String withTable(String query)
{
- Object[][] r = new Object[rows.length][];
- System.arraycopy(rows, 0, r, 0, rows.length);
- return r;
+ return String.format(query, tableName);
}
-
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 801df7d..37f43bd 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -42,6 +42,9 @@ import org.apache.cassandra.distributed.shared.DistributedTestBase;
public class TestBaseImpl extends DistributedTestBase
{
+ public static final Object[][] EMPTY_ROWS = new Object[0][];
+ public static final boolean[] BOOLEANS = new boolean[]{ false, true };
+
@After
public void afterEach() {
super.afterEach();
@@ -60,6 +63,13 @@ public class TestBaseImpl extends DistributedTestBase
return Cluster.build();
}
+ public static Object[][] rows(Object[]...rows)
+ {
+ Object[][] r = new Object[rows.length][];
+ System.arraycopy(rows, 0, r, 0, rows.length);
+ return r;
+ }
+
public static Object list(Object...values)
{
return Arrays.asList(values);
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java
new file mode 100644
index 0000000..8968ede
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityTestBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static java.lang.String.format;
+
+public class MixedModeAvailabilityTestBase extends UpgradeTestBase
+{
+ private static final int NUM_NODES = 3;
+ private static final int COORDINATOR = 1;
+ private static final List<Tester> TESTERS = Arrays.asList(new Tester(ONE, ALL),
+ new Tester(QUORUM, QUORUM),
+ new Tester(ALL, ONE));
+
+
+ protected static void testAvailability(Versions.Major initial, Versions.Major... upgrade) throws Throwable
+ {
+ testAvailability(true, initial, upgrade);
+ testAvailability(false, initial, upgrade);
+ }
+
+ private static void testAvailability(boolean upgradedCoordinator,
+ Versions.Major initial,
+ Versions.Major... upgrade) throws Throwable
+ {
+ new TestCase()
+ .nodes(NUM_NODES)
+ .nodesToUpgrade(upgradedCoordinator ? 1 : 2)
+ .upgrade(initial, upgrade)
+ .withConfig(config -> config.set("read_request_timeout_in_ms", SECONDS.toMillis(1))
+ .set("write_request_timeout_in_ms", SECONDS.toMillis(1)))
+ .setup(c -> c.schemaChange(withKeyspace("CREATE TABLE %s.t (k uuid, c int, v int, PRIMARY KEY (k, c))")))
+ .runAfterNodeUpgrade((cluster, n) -> {
+
+ // using 0 to 2 down nodes...
+ for (int numNodesDown = 0; numNodesDown < NUM_NODES; numNodesDown++)
+ {
+ // disable communications to the down nodes
+ if (numNodesDown > 0)
+ cluster.get(replica(COORDINATOR, numNodesDown)).shutdown();
+
+ // run the test cases that are compatible with the number of down nodes
+ ICoordinator coordinator = cluster.coordinator(COORDINATOR);
+ for (Tester tester : TESTERS)
+ tester.test(coordinator, numNodesDown, upgradedCoordinator);
+ }
+ }).run();
+ }
+
+ private static int replica(int node, int depth)
+ {
+ assert depth >= 0;
+ return depth == 0 ? node : replica(node == NUM_NODES ? 1 : node + 1, depth - 1);
+ }
+
+ private static class Tester
+ {
+ private static final String INSERT = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
+ private static final String SELECT = withKeyspace("SELECT * FROM %s.t WHERE k = ?");
+
+ private final ConsistencyLevel writeConsistencyLevel;
+ private final ConsistencyLevel readConsistencyLevel;
+
+ private Tester(ConsistencyLevel writeConsistencyLevel, ConsistencyLevel readConsistencyLevel)
+ {
+ this.writeConsistencyLevel = writeConsistencyLevel;
+ this.readConsistencyLevel = readConsistencyLevel;
+ }
+
+ public void test(ICoordinator coordinator, int numNodesDown, boolean upgradedCoordinator)
+ {
+ UUID key = UUID.randomUUID();
+ Object[] row1 = row(key, 1, 10);
+ Object[] row2 = row(key, 2, 20);
+
+ try
+ {
+ // test write
+ maybeFail(WriteTimeoutException.class, numNodesDown > maxNodesDown(writeConsistencyLevel), () -> {
+ coordinator.execute(INSERT, writeConsistencyLevel, row1);
+ coordinator.execute(INSERT, writeConsistencyLevel, row2);
+ });
+
+ // test read
+ maybeFail(ReadTimeoutException.class, numNodesDown > maxNodesDown(readConsistencyLevel), () -> {
+ Object[][] rows = coordinator.execute(SELECT, readConsistencyLevel, key);
+ if (numNodesDown <= maxNodesDown(writeConsistencyLevel))
+ assertRows(rows, row1, row2);
+ });
+ }
+ catch (Throwable t)
+ {
+ throw new AssertionError(format("Unexpected error in case %s-%s with %s coordinator and %d nodes down",
+ writeConsistencyLevel,
+ readConsistencyLevel,
+ upgradedCoordinator ? "upgraded" : "not upgraded",
+ numNodesDown), t);
+ }
+ }
+
+ private static <E extends Exception> void maybeFail(Class<E> exceptionClass, boolean shouldFail, Runnable test)
+ {
+ try
+ {
+ test.run();
+ assertFalse(shouldFail);
+ }
+ catch (Exception e)
+ {
+ // we should use exception class names due to the different classpaths
+ String className = e.getClass().getCanonicalName();
+ if (e instanceof RuntimeException && e.getCause() != null)
+ className = e.getCause().getClass().getCanonicalName();
+
+ if (shouldFail)
+ assertEquals(exceptionClass.getCanonicalName(), className);
+ else
+ throw e;
+ }
+ }
+
+ private static int maxNodesDown(ConsistencyLevel cl)
+ {
+ if (cl == ONE)
+ return 2;
+
+ if (cl == QUORUM)
+ return 1;
+
+ if (cl == ALL)
+ return 0;
+
+ throw new IllegalArgumentException("Usupported consistency level: " + cl);
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java
new file mode 100644
index 0000000..367ef5f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV22Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v22.
+ */
+public class MixedModeAvailabilityV22Test extends MixedModeAvailabilityTestBase
+{
+ @Test
+ public void testAvailabilityV22ToV30() throws Throwable
+ {
+ testAvailability(Versions.Major.v22, Versions.Major.v30);
+ }
+
+ @Test
+ public void testAvailabilityV22ToV3X() throws Throwable
+ {
+ testAvailability(Versions.Major.v22, Versions.Major.v3X);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java
new file mode 100644
index 0000000..4e25a64
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV30Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v30.
+ */
+public class MixedModeAvailabilityV30Test extends MixedModeAvailabilityTestBase
+{
+ @Test
+ public void testAvailabilityV30ToV3X() throws Throwable
+ {
+ testAvailability(Versions.Major.v30, Versions.Major.v3X);
+ }
+
+ @Test
+ public void testAvailabilityV30ToV4() throws Throwable
+ {
+ testAvailability(Versions.Major.v30, Versions.Major.v4);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java
new file mode 100644
index 0000000..622d6c6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeAvailabilityV3XTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeAvailabilityTestBase} for upgrades from v3X.
+ */
+public class MixedModeAvailabilityV3XTest extends MixedModeAvailabilityTestBase
+{
+ @Test
+ public void testAvailabilityV3XToV4() throws Throwable
+ {
+ testAvailability(Versions.Major.v3X, Versions.Major.v4);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java
new file mode 100644
index 0000000..ef54d61
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+public class MixedModeConsistencyTestBase extends UpgradeTestBase
+{
+ protected static void testConsistency(Versions.Major initial, Versions.Major... upgrade) throws Throwable
+ {
+ List<Tester> testers = new ArrayList<>();
+ testers.addAll(Tester.create(1, ALL));
+ testers.addAll(Tester.create(2, ALL, QUORUM));
+ testers.addAll(Tester.create(3, ALL, QUORUM, ONE));
+
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1)
+ .upgrade(initial, upgrade)
+ .withConfig(config -> config.set("read_request_timeout_in_ms", SECONDS.toMillis(30))
+ .set("write_request_timeout_in_ms", SECONDS.toMillis(30)))
+ .setup(cluster -> {
+ Tester.createTable(cluster);
+ for (Tester tester : testers)
+ tester.writeRows(cluster);
+ }).runAfterNodeUpgrade((cluster, node) -> {
+ for (Tester tester : testers)
+ tester.readRows(cluster);
+ }).run();
+ }
+
+ private static class Tester
+ {
+ private final int numWrittenReplicas;
+ private final ConsistencyLevel readConsistencyLevel;
+ private final UUID partitionKey;
+
+ private Tester(int numWrittenReplicas, ConsistencyLevel readConsistencyLevel)
+ {
+ this.numWrittenReplicas = numWrittenReplicas;
+ this.readConsistencyLevel = readConsistencyLevel;
+ partitionKey = UUID.randomUUID();
+ }
+
+ private static List<Tester> create(int numWrittenReplicas, ConsistencyLevel... readConsistencyLevels)
+ {
+ return Stream.of(readConsistencyLevels)
+ .map(readConsistencyLevel -> new Tester(numWrittenReplicas, readConsistencyLevel))
+ .collect(Collectors.toList());
+ }
+
+ private static void createTable(UpgradeableCluster cluster)
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k uuid, c int, v int, PRIMARY KEY (k, c))"));
+ }
+
+ private void writeRows(UpgradeableCluster cluster)
+ {
+ String query = withKeyspace("INSERT INTO %s.t (k, c, v) VALUES (?, ?, ?)");
+ for (int i = 1; i <= numWrittenReplicas; i++)
+ {
+ IUpgradeableInstance node = cluster.get(i);
+ node.executeInternal(query, partitionKey, 1, 10);
+ node.executeInternal(query, partitionKey, 2, 20);
+ node.executeInternal(query, partitionKey, 3, 30);
+ }
+ }
+
+ private void readRows(UpgradeableCluster cluster)
+ {
+ String query = withKeyspace("SELECT * FROM %s.t WHERE k = ?");
+ int coordinator = 1;
+ try
+ {
+ for (coordinator = 1; coordinator <= cluster.size(); coordinator++)
+ {
+ assertRows(cluster.coordinator(coordinator).execute(query, readConsistencyLevel, partitionKey),
+ row(partitionKey, 1, 10),
+ row(partitionKey, 2, 20),
+ row(partitionKey, 3, 30));
+ }
+ }
+ catch (Throwable t)
+ {
+ String format = "Unexpected error reading rows with %d written replicas, CL=%s and coordinator=%s";
+ throw new AssertionError(format(format, numWrittenReplicas, readConsistencyLevel, coordinator), t);
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java
new file mode 100644
index 0000000..ef9f766
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV22Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v22.
+ */
+public class MixedModeConsistencyV22Test extends MixedModeConsistencyTestBase
+{
+ @Test
+ public void testConsistencyV22ToV30() throws Throwable
+ {
+ testConsistency(Versions.Major.v22, Versions.Major.v30);
+ }
+
+ @Test
+ public void testConsistencyV22ToV3X() throws Throwable
+ {
+ testConsistency(Versions.Major.v22, Versions.Major.v3X);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java
new file mode 100644
index 0000000..efe94ba
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV30Test.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v30.
+ */
+public class MixedModeConsistencyV30Test extends MixedModeConsistencyTestBase
+{
+ @Test
+ public void testConsistencyV30ToV3X() throws Throwable
+ {
+ testConsistency(Versions.Major.v30, Versions.Major.v3X);
+ }
+
+ @Test
+ public void testConsistencyV30ToV4() throws Throwable
+ {
+ testConsistency(Versions.Major.v30, Versions.Major.v4);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java
new file mode 100644
index 0000000..0405716
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeConsistencyV3XTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+/**
+ * {@link MixedModeConsistencyTestBase} for upgrades from v3X.
+ */
+public class MixedModeConsistencyV3XTest extends MixedModeConsistencyTestBase
+{
+ @Test
+ public void testConsistencyV3XToV4() throws Throwable
+ {
+ testConsistency(Versions.Major.v3X, Versions.Major.v4);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 9ce7867..01ac40d 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@ -27,33 +27,6 @@ import static org.apache.cassandra.distributed.shared.AssertUtils.*;
public class UpgradeTest extends UpgradeTestBase
{
-
- @Test
- public void upgradeTest() throws Throwable
- {
- new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
- .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4)
- .setup((cluster) -> {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
- })
- .runAfterNodeUpgrade((cluster, node) -> {
- for (int i = 1; i <= cluster.size(); i++)
- {
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- row(1, 1, 1),
- row(1, 2, 2),
- row(1, 3, 3));
- }
- }).run();
- }
-
@Test
public void simpleUpgradeWithNetworkAndGossipTest() throws Throwable
{
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index eba9a7c..ca5ec63 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -130,6 +130,11 @@ public class Util
return comparator.make(o);
}
+ public static Token token(int key)
+ {
+ return testPartitioner().getToken(ByteBufferUtil.bytes(key));
+ }
+
public static Token token(String key)
{
return testPartitioner().getToken(ByteBufferUtil.bytes(key));
diff --git a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
index 56d7ae1..986a125 100644
--- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
@@ -20,47 +20,32 @@ package org.apache.cassandra.db;
import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
-import com.google.common.collect.Iterators;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.cassandra.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaPlans;
-import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
public class PartitionRangeReadTest
{
- public static final String KEYSPACE1 = "PartitionRangeReadTest1";
- public static final String KEYSPACE2 = "PartitionRangeReadTest2";
- public static final String CF_STANDARD1 = "Standard1";
- public static final String CF_STANDARDINT = "StandardInteger1";
- public static final String CF_COMPACT1 = "Compact1";
+ private static final String KEYSPACE1 = "PartitionRangeReadTest1";
+ private static final String KEYSPACE2 = "PartitionRangeReadTest2";
+ private static final String CF_STANDARD1 = "Standard1";
+ private static final String CF_STANDARDINT = "StandardInteger1";
+ private static final String CF_COMPACT1 = "Compact1";
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@ -99,11 +84,10 @@ public class PartitionRangeReadTest
}
@Test
- public void testCassandra6778() throws CharacterCodingException
+ public void testCassandra6778()
{
- String cfname = CF_STANDARDINT;
Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARDINT);
cfs.truncateBlocking();
ByteBuffer col = ByteBufferUtil.bytes("val");
@@ -127,11 +111,11 @@ public class PartitionRangeReadTest
// fetch by the first column name; we should get the second version of the column value
Row row = Util.getOnlyRow(Util.cmd(cfs, "k1").includeRow(new BigInteger(new byte[]{1})).build());
- assertTrue(row.getCell(cDef).buffer().equals(ByteBufferUtil.bytes("val2")));
+ assertEquals(ByteBufferUtil.bytes("val2"), row.getCell(cDef).buffer());
// fetch by the second column name; we should get the second version of the column value
row = Util.getOnlyRow(Util.cmd(cfs, "k1").includeRow(new BigInteger(new byte[]{0, 0, 1})).build());
- assertTrue(row.getCell(cDef).buffer().equals(ByteBufferUtil.bytes("val2")));
+ assertEquals(ByteBufferUtil.bytes("val2"), row.getCell(cDef).buffer());
}
@Test
@@ -159,12 +143,10 @@ public class PartitionRangeReadTest
}
@Test
- public void testRangeSliceInclusionExclusion() throws Throwable
+ public void testRangeSliceInclusionExclusion()
{
- String keyspaceName = KEYSPACE1;
- String cfName = CF_STANDARD1;
- Keyspace keyspace = Keyspace.open(keyspaceName);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
cfs.clearUnsafe();
for (int i = 0; i < 10; ++i)
@@ -184,152 +166,26 @@ public class PartitionRangeReadTest
// Start and end inclusive
partitions = Util.getAll(Util.cmd(cfs).fromKeyIncl("2").toKeyIncl("7").build());
assertEquals(6, partitions.size());
- assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("2")));
- assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("7")));
+ assertEquals(ByteBufferUtil.bytes("2"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+ assertEquals(ByteBufferUtil.bytes("7"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
// Start and end excluded
partitions = Util.getAll(Util.cmd(cfs).fromKeyExcl("2").toKeyExcl("7").build());
assertEquals(4, partitions.size());
- assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("3")));
- assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("6")));
+ assertEquals(ByteBufferUtil.bytes("3"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+ assertEquals(ByteBufferUtil.bytes("6"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
// Start excluded, end included
partitions = Util.getAll(Util.cmd(cfs).fromKeyExcl("2").toKeyIncl("7").build());
assertEquals(5, partitions.size());
- assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("3")));
- assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("7")));
+ assertEquals(ByteBufferUtil.bytes("3"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+ assertEquals(ByteBufferUtil.bytes("7"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
// Start included, end excluded
partitions = Util.getAll(Util.cmd(cfs).fromKeyIncl("2").toKeyExcl("7").build());
assertEquals(5, partitions.size());
- assertTrue(partitions.get(0).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("2")));
- assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer().equals(ByteBufferUtil.bytes("6")));
- }
-
- @Test
- public void testComputeConcurrencyFactor()
- {
- int maxConcurrentRangeRequest = 32;
-
- // no live row returned, fetch all remaining ranges but hit the max instead
- int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
- assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
-
- // no live row returned, fetch all remaining ranges
- cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
- assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
-
- // returned half rows, fetch rangesQueried again but hit the max instead
- cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
- assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
-
- // returned half rows, fetch rangesQueried again
- cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
- assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
-
- // returned most of rows, 1 more range to fetch
- cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
- assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
- }
-
- @Test
- public void testRangeCountWithRangeMerge()
- {
- List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
- int vnodeCount = 0;
-
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
- for (int i = 0; i + 1 < tokens.size(); i++)
- {
- Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
- ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
- vnodeCount++;
- }
-
- StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
- ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
- // all ranges are merged as test has only one node.
- assertEquals(vnodeCount, mergedRange.vnodeCount());
- }
-
- @Test
- public void testRangeQueried()
- {
- List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
- int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
-
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
- cfs.clearUnsafe();
-
- int rows = 100;
- for (int i = 0; i < rows; ++i)
- {
- RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
- builder.clustering("c");
- builder.add("val", String.valueOf(i));
- builder.build().applyUnsafe();
- }
- cfs.forceBlockingFlush();
-
- PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
-
- // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
- Iterator<ReplicaPlan.ForRangeRead> ranges = rangeIterator(command, keyspace, false);
- StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
- verifyRangeCommandIterator(data, rows, 2, vnodeCount);
-
- // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
- ranges = rangeIterator(command, keyspace, false);
- data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, System.nanoTime());
- verifyRangeCommandIterator(data, rows, 1, vnodeCount);
-
- // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
- ranges = rangeIterator(command, keyspace, false);
- data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
- verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
-
- // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
- ranges = rangeIterator(command, keyspace, true);
- data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, System.nanoTime());
- verifyRangeCommandIterator(data, rows, 1, vnodeCount);
-
- // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
- ranges = rangeIterator(command, keyspace, true);
- data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, System.nanoTime());
- verifyRangeCommandIterator(data, rows, 1, vnodeCount);
- }
-
- private Iterator<ReplicaPlan.ForRangeRead> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
- {
- Iterator<ReplicaPlan.ForRangeRead> ranges = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
- if (withRangeMerger)
- ranges = new StorageProxy.RangeMerger(ranges, keyspace, ConsistencyLevel.ONE);
-
- return ranges;
- }
-
- private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount)
- {
- int num = Util.size(data);
- assertEquals(rows, num);
- assertEquals(batches, data.batchesRequested());
- assertEquals(vnodeCount, data.rangesQueried());
- }
-
- private List<Token> setTokens(List<Integer> values)
- {
- IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
- List<Token> tokens = new ArrayList<>(values.size());
- for (Integer val : values)
- tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val)));
-
- TokenMetadata tmd = StorageService.instance.getTokenMetadata();
- tmd.clearUnsafe();
- tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
-
- return tokens;
+ assertEquals(ByteBufferUtil.bytes("2"), partitions.get(0).iterator().next().getCell(cDef).buffer());
+ assertEquals(ByteBufferUtil.bytes("6"), partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).buffer());
}
}
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
deleted file mode 100644
index 590cfeb..0000000
--- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.service;
-
-import java.util.List;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.TokenMetadata;
-
-import static org.apache.cassandra.Util.rp;
-import static org.apache.cassandra.Util.token;
-import static org.junit.Assert.assertEquals;
-
-public class StorageProxyTest
-{
- private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
- {
- return new Range<PartitionPosition>(left, right);
- }
-
- private static Bounds<PartitionPosition> bounds(PartitionPosition left, PartitionPosition right)
- {
- return new Bounds<PartitionPosition>(left, right);
- }
-
- private static ExcludingBounds<PartitionPosition> exBounds(PartitionPosition left, PartitionPosition right)
- {
- return new ExcludingBounds<PartitionPosition>(left, right);
- }
-
- private static IncludingExcludingBounds<PartitionPosition> incExBounds(PartitionPosition left, PartitionPosition right)
- {
- return new IncludingExcludingBounds<PartitionPosition>(left, right);
- }
-
- private static PartitionPosition startOf(String key)
- {
- return token(key).minKeyBound();
- }
-
- private static PartitionPosition endOf(String key)
- {
- return token(key).maxKeyBound();
- }
-
- private static Range<Token> tokenRange(String left, String right)
- {
- return new Range<Token>(token(left), token(right));
- }
-
- private static Bounds<Token> tokenBounds(String left, String right)
- {
- return new Bounds<Token>(token(left), token(right));
- }
-
- @BeforeClass
- public static void beforeClass() throws Throwable
- {
- DatabaseDescriptor.daemonInitialization();
- DatabaseDescriptor.getHintsDirectory().mkdir();
- TokenMetadata tmd = StorageService.instance.getTokenMetadata();
- tmd.updateNormalToken(token("1"), InetAddressAndPort.getByName("127.0.0.1"));
- tmd.updateNormalToken(token("6"), InetAddressAndPort.getByName("127.0.0.6"));
- }
-
- // test getRestrictedRanges for token
- private void testGRR(AbstractBounds<Token> queryRange, AbstractBounds<Token>... expected)
- {
- // Testing for tokens
- List<AbstractBounds<Token>> restricted = StorageProxy.getRestrictedRanges(queryRange);
- assertEquals(restricted.toString(), expected.length, restricted.size());
- for (int i = 0; i < expected.length; i++)
- assertEquals("Mismatch for index " + i + ": " + restricted, expected[i], restricted.get(i));
- }
-
- // test getRestrictedRanges for keys
- private void testGRRKeys(AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
- {
- // Testing for keys
- List<AbstractBounds<PartitionPosition>> restrictedKeys = StorageProxy.getRestrictedRanges(queryRange);
- assertEquals(restrictedKeys.toString(), expected.length, restrictedKeys.size());
- for (int i = 0; i < expected.length; i++)
- assertEquals("Mismatch for index " + i + ": " + restrictedKeys, expected[i], restrictedKeys.get(i));
-
- }
-
- @Test
- public void testGRR() throws Throwable
- {
- // no splits
- testGRR(tokenRange("2", "5"), tokenRange("2", "5"));
- testGRR(tokenBounds("2", "5"), tokenBounds("2", "5"));
- // single split
- testGRR(tokenRange("2", "7"), tokenRange("2", "6"), tokenRange("6", "7"));
- testGRR(tokenBounds("2", "7"), tokenBounds("2", "6"), tokenRange("6", "7"));
- // single split starting from min
- testGRR(tokenRange("", "2"), tokenRange("", "1"), tokenRange("1", "2"));
- testGRR(tokenBounds("", "2"), tokenBounds("", "1"), tokenRange("1", "2"));
- // single split ending with max
- testGRR(tokenRange("5", ""), tokenRange("5", "6"), tokenRange("6", ""));
- testGRR(tokenBounds("5", ""), tokenBounds("5", "6"), tokenRange("6", ""));
- // two splits
- testGRR(tokenRange("0", "7"), tokenRange("0", "1"), tokenRange("1", "6"), tokenRange("6", "7"));
- testGRR(tokenBounds("0", "7"), tokenBounds("0", "1"), tokenRange("1", "6"), tokenRange("6", "7"));
-
-
- // Keys
- // no splits
- testGRRKeys(range(rp("2"), rp("5")), range(rp("2"), rp("5")));
- testGRRKeys(bounds(rp("2"), rp("5")), bounds(rp("2"), rp("5")));
- testGRRKeys(exBounds(rp("2"), rp("5")), exBounds(rp("2"), rp("5")));
- // single split testGRRKeys(range("2", "7"), range(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
- testGRRKeys(bounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
- testGRRKeys(exBounds(rp("2"), rp("7")), range(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
- testGRRKeys(incExBounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
- // single split starting from min
- testGRRKeys(range(rp(""), rp("2")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
- testGRRKeys(bounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), range(endOf("1"), rp("2")));
- testGRRKeys(exBounds(rp(""), rp("2")), range(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
- testGRRKeys(incExBounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
- // single split ending with max
- testGRRKeys(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
- testGRRKeys(bounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), range(endOf("6"), rp("")));
- testGRRKeys(exBounds(rp("5"), rp("")), range(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
- testGRRKeys(incExBounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
- // two splits
- testGRRKeys(range(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
- testGRRKeys(bounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
- testGRRKeys(exBounds(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
- testGRRKeys(incExBounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
- }
-
- @Test
- public void testGRRExact() throws Throwable
- {
- // min
- testGRR(tokenRange("1", "5"), tokenRange("1", "5"));
- testGRR(tokenBounds("1", "5"), tokenBounds("1", "1"), tokenRange("1", "5"));
- // max
- testGRR(tokenRange("2", "6"), tokenRange("2", "6"));
- testGRR(tokenBounds("2", "6"), tokenBounds("2", "6"));
- // both
- testGRR(tokenRange("1", "6"), tokenRange("1", "6"));
- testGRR(tokenBounds("1", "6"), tokenBounds("1", "1"), tokenRange("1", "6"));
-
-
- // Keys
- // min
- testGRRKeys(range(endOf("1"), endOf("5")), range(endOf("1"), endOf("5")));
- testGRRKeys(range(rp("1"), endOf("5")), range(rp("1"), endOf("1")), range(endOf("1"), endOf("5")));
- testGRRKeys(bounds(startOf("1"), endOf("5")), bounds(startOf("1"), endOf("1")), range(endOf("1"), endOf("5")));
- testGRRKeys(exBounds(endOf("1"), rp("5")), exBounds(endOf("1"), rp("5")));
- testGRRKeys(exBounds(rp("1"), rp("5")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
- testGRRKeys(exBounds(startOf("1"), endOf("5")), range(startOf("1"), endOf("1")), exBounds(endOf("1"), endOf("5")));
- testGRRKeys(incExBounds(rp("1"), rp("5")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
- // max
- testGRRKeys(range(endOf("2"), endOf("6")), range(endOf("2"), endOf("6")));
- testGRRKeys(bounds(startOf("2"), endOf("6")), bounds(startOf("2"), endOf("6")));
- testGRRKeys(exBounds(rp("2"), rp("6")), exBounds(rp("2"), rp("6")));
- testGRRKeys(incExBounds(rp("2"), rp("6")), incExBounds(rp("2"), rp("6")));
- // bothKeys
- testGRRKeys(range(rp("1"), rp("6")), range(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
- testGRRKeys(bounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
- testGRRKeys(exBounds(rp("1"), rp("6")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
- testGRRKeys(incExBounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
- }
-
- @Test
- public void testGRRWrapped() throws Throwable
- {
- // one token in wrapped range
- testGRR(tokenRange("7", "0"), tokenRange("7", ""), tokenRange("", "0"));
- // two tokens in wrapped range
- testGRR(tokenRange("5", "0"), tokenRange("5", "6"), tokenRange("6", ""), tokenRange("", "0"));
- testGRR(tokenRange("7", "2"), tokenRange("7", ""), tokenRange("", "1"), tokenRange("1", "2"));
- // full wraps
- testGRR(tokenRange("0", "0"), tokenRange("0", "1"), tokenRange("1", "6"), tokenRange("6", ""), tokenRange("", "0"));
- testGRR(tokenRange("", ""), tokenRange("", "1"), tokenRange("1", "6"), tokenRange("6", ""));
- // wrap on member tokens
- testGRR(tokenRange("6", "6"), tokenRange("6", ""), tokenRange("", "1"), tokenRange("1", "6"));
- testGRR(tokenRange("6", "1"), tokenRange("6", ""), tokenRange("", "1"));
- // end wrapped
- testGRR(tokenRange("5", ""), tokenRange("5", "6"), tokenRange("6", ""));
-
- // Keys
- // one token in wrapped range
- testGRRKeys(range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
- // two tokens in wrapped range
- testGRRKeys(range(rp("5"), rp("0")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
- testGRRKeys(range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
- // full wraps
- testGRRKeys(range(rp("0"), rp("0")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
- testGRRKeys(range(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
- // wrap on member tokens
- testGRRKeys(range(rp("6"), rp("6")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("6")));
- testGRRKeys(range(rp("6"), rp("1")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("1")));
- // end wrapped
- testGRRKeys(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
- }
-
- @Test
- public void testGRRExactBounds() throws Throwable
- {
- // equal tokens are special cased as non-wrapping for bounds
- testGRR(tokenBounds("0", "0"), tokenBounds("0", "0"));
- // completely empty bounds match everything
- testGRR(tokenBounds("", ""), tokenBounds("", "1"), tokenRange("1", "6"), tokenRange("6", ""));
-
- // Keys
- // equal tokens are special cased as non-wrapping for bounds
- testGRRKeys(bounds(rp("0"), rp("0")), bounds(rp("0"), rp("0")));
- // completely empty bounds match everything
- testGRRKeys(bounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
- testGRRKeys(exBounds(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
- testGRRKeys(incExBounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
- }
-}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java
new file mode 100644
index 0000000..d82a503
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandIteratorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import static org.junit.Assert.assertEquals;
+
+public class RangeCommandIteratorTest
+{
+ private static final String KEYSPACE1 = "RangeCommandIteratorTest";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ }
+
+ @Test
+ public void testRangeCountWithRangeMerge()
+ {
+ List<Token> tokens = setTokens(100, 200, 300, 400);
+ int vnodeCount = 0;
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+ for (int i = 0; i + 1 < tokens.size(); i++)
+ {
+ Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+ ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+ vnodeCount++;
+ }
+
+ ReplicaPlanMerger merge = new ReplicaPlanMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+ ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+ // all ranges are merged as test has only one node.
+ assertEquals(vnodeCount, mergedRange.vnodeCount());
+ }
+
+ @Test
+ public void testRangeQueried()
+ {
+ List<Token> tokens = setTokens(100, 200, 300, 400);
+ int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.clearUnsafe();
+
+ int rows = 100;
+ for (int i = 0; i < rows; ++i)
+ {
+ RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+ builder.clustering("c");
+ builder.add("val", String.valueOf(i));
+ builder.build().applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+
+ PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+ AbstractBounds<PartitionPosition> keyRange = command.dataRange().keyRange();
+
+ // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
+ CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+ RangeCommandIterator data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, System.nanoTime());
+ verifyRangeCommandIterator(data, rows, 2, vnodeCount);
+
+ // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
+ replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+ data = new RangeCommandIterator(replicaPlans, command, vnodeCount, 1000, vnodeCount, System.nanoTime());
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+ // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
+ replicaPlans = replicaPlanIterator(keyRange, keyspace, false);
+ data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, System.nanoTime());
+ verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
+
+ // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
+ replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
+ data = new RangeCommandIterator(replicaPlans, command, 1, 1000, vnodeCount, System.nanoTime());
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+ // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
+ replicaPlans = replicaPlanIterator(keyRange, keyspace, true);
+ data = new RangeCommandIterator(replicaPlans, command, 1, 1, vnodeCount, System.nanoTime());
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+ }
+
+ @Test
+ public void testComputeConcurrencyFactor()
+ {
+ int maxConcurrentRangeRequest = 32;
+
+ // no live row returned, fetch all remaining ranges but hit the max instead
+ int cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
+ assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+ // no live row returned, fetch all remaining ranges
+ cf = RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
+ assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+ // returned half rows, fetch rangesQueried again but hit the max instead
+ cf = RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
+ assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+ // returned half rows, fetch rangesQueried again
+ cf = RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
+ assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+ // returned most of rows, 1 more range to fetch
+ cf = RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
+ assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+ }
+
+ private static List<Token> setTokens(int... values)
+ {
+ return new TokenUpdater().withKeys(values).update().getTokens();
+ }
+
+ private static CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlanIterator(AbstractBounds<PartitionPosition> keyRange,
+ Keyspace keyspace,
+ boolean withRangeMerger)
+ {
+ CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans = new ReplicaPlanIterator(keyRange, keyspace, ConsistencyLevel.ONE);
+ if (withRangeMerger)
+ replicaPlans = new ReplicaPlanMerger(replicaPlans, keyspace, ConsistencyLevel.ONE);
+
+ return replicaPlans;
+ }
+
+ private static void verifyRangeCommandIterator(RangeCommandIterator data, int rows, int batches, int vnodeCount)
+ {
+ int num = Util.size(data);
+ assertEquals(rows, num);
+ assertEquals(batches, data.batchesRequested());
+ assertEquals(vnodeCount, data.rangesQueried());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java
new file mode 100644
index 0000000..294be2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/RangeCommandsTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.nio.ByteBuffer;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.AbstractReadCommandBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.CachedPartition;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RangeCommands}.
+ */
+public class RangeCommandsTest extends CQLTester
+{
+ private static final int MAX_CONCURRENCY_FACTOR = 4;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ System.setProperty("cassandra.max_concurrent_range_requests", String.valueOf(MAX_CONCURRENCY_FACTOR));
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ System.clearProperty("cassandra.max_concurrent_range_requests");
+ }
+
+ @Test
+ public void tesConcurrencyFactor()
+ {
+ new TokenUpdater().withTokens("127.0.0.1", 1, 2)
+ .withTokens("127.0.0.2", 3, 4)
+ .update();
+
+ String table = createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(table);
+
+ // verify that a low concurrency factor is not capped by the max concurrency factor
+ PartitionRangeReadCommand command = command(cfs, 50, 50);
+ try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+ ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+ {
+ assertEquals(2, partitions.concurrencyFactor());
+ assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+ assertEquals(5, ranges.size());
+ }
+
+ // verify that a high concurrency factor is capped by the max concurrency factor
+ command = command(cfs, 1000, 50);
+ try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+ ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+ {
+ assertEquals(MAX_CONCURRENCY_FACTOR, partitions.concurrencyFactor());
+ assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+ assertEquals(5, ranges.size());
+ }
+
+ // with 0 estimated results per range the concurrency factor should be 1
+ command = command(cfs, 1000, 0);
+ try (RangeCommandIterator partitions = RangeCommands.rangeCommandIterator(command, ONE, System.nanoTime());
+ ReplicaPlanIterator ranges = new ReplicaPlanIterator(command.dataRange().keyRange(), keyspace, ONE))
+ {
+ assertEquals(1, partitions.concurrencyFactor());
+ assertEquals(MAX_CONCURRENCY_FACTOR, partitions.maxConcurrencyFactor());
+ assertEquals(5, ranges.size());
+ }
+ }
+
+ @Test
+ public void testEstimateResultsPerRange()
+ {
+ testEstimateResultsPerRange(1);
+ testEstimateResultsPerRange(2);
+ }
+
+ private void testEstimateResultsPerRange(int rf)
+ {
+ String ks = createKeyspace(String.format("CREATE KEYSPACE %%s WITH replication={'class':'SimpleStrategy', 'replication_factor':%s}", rf));
+ String table = createTable(ks, "CREATE TABLE %s (k int PRIMARY KEY, v int)");
+ createIndex(String.format("CREATE CUSTOM INDEX ON %s.%s(v) USING '%s'", ks, table, MockedIndex.class.getName()));
+ Keyspace keyspace = Keyspace.open(ks);
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
+
+ setNumTokens(1);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, null, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, null, 1);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, null, 10);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, null, 100);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, 1000, 1000);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, 1000, 1000);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, 1000, 1000);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, 1000, 1000);
+
+ setNumTokens(5);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, null, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, null, 0.2f);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, null, 2);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, null, 20);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, 0, 0);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 0, 1000, 200);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 1, 1000, 200);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 10, 1000, 200);
+ testEstimateResultsPerRange(keyspace, cfs, rf, 100, 1000, 200);
+ }
+
+ private static void testEstimateResultsPerRange(Keyspace keyspace,
+ ColumnFamilyStore cfs,
+ int rf,
+ int commandEstimate,
+ Integer indexEstimate,
+ float expectedEstimate)
+ {
+ PartitionRangeReadCommand command = command(cfs, Integer.MAX_VALUE, commandEstimate, indexEstimate);
+ assertEquals(expectedEstimate / rf, RangeCommands.estimateResultsPerRange(command, keyspace), 0);
+ }
+
+ private static PartitionRangeReadCommand command(ColumnFamilyStore cfs, int limit, int commandEstimate)
+ {
+ return command(cfs, limit, commandEstimate, null);
+ }
+
+ private static PartitionRangeReadCommand command(ColumnFamilyStore cfs, int limit, int commandEstimate, Integer indexEstimate)
+ {
+ AbstractReadCommandBuilder.PartitionRangeBuilder commandBuilder = Util.cmd(cfs);
+ if (indexEstimate != null)
+ {
+ commandBuilder.filterOn("v", Operator.EQ, 0);
+ MockedIndex.estimatedResultRows = indexEstimate;
+ }
+ PartitionRangeReadCommand command = (PartitionRangeReadCommand) commandBuilder.build();
+ return command.withUpdatedLimit(new MockedDataLimits(DataLimits.cqlLimits(limit), commandEstimate));
+ }
+
+ private static void setNumTokens(int numTokens)
+ {
+ DatabaseDescriptor.getRawConfig().num_tokens = numTokens;
+ }
+
+ private static class MockedDataLimits extends DataLimits
+ {
+ private final DataLimits wrapped;
+ private final int estimateTotalResults;
+
+ public MockedDataLimits(DataLimits wrapped, int estimateTotalResults)
+ {
+ this.wrapped = wrapped;
+ this.estimateTotalResults = estimateTotalResults;
+ }
+
+ @Override
+ public float estimateTotalResults(ColumnFamilyStore cfs)
+ {
+ return estimateTotalResults;
+ }
+
+ @Override
+ public Kind kind()
+ {
+ return wrapped.kind();
+ }
+
+ @Override
+ public boolean isUnlimited()
+ {
+ return wrapped.isUnlimited();
+ }
+
+ @Override
+ public boolean isDistinct()
+ {
+ return wrapped.isDistinct();
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize)
+ {
+ return wrapped.forPaging(pageSize);
+ }
+
+ @Override
+ public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
+ {
+ return wrapped.forPaging(pageSize, lastReturnedKey, lastReturnedKeyRemaining);
+ }
+
+ @Override
+ public DataLimits forShortReadRetry(int toFetch)
+ {
+ return wrapped.forShortReadRetry(toFetch);
+ }
+
+ @Override
+ public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
+ {
+ return wrapped.hasEnoughLiveData(cached, nowInSec, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
+ }
+
+ @Override
+ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
+ {
+ return wrapped.newCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
+ }
+
+ @Override
+ public int count()
+ {
+ return wrapped.count();
+ }
+
+ @Override
+ public int perPartitionCount()
+ {
+ return wrapped.perPartitionCount();
+ }
+
+ @Override
+ public DataLimits withoutState()
+ {
+ return wrapped.withoutState();
+ }
+ }
+
+ public static final class MockedIndex extends StubIndex
+ {
+ private static long estimatedResultRows = 0;
+
+ public MockedIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ @Override
+ public long getEstimatedResultRows()
+ {
+ return estimatedResultRows;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java
new file mode 100644
index 0000000..745ad4e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanIteratorTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+
+import static org.apache.cassandra.Util.rp;
+import static org.apache.cassandra.Util.token;
+import static org.junit.Assert.assertEquals;
+
+public class ReplicaPlanIteratorTest
+{
+ private static final String KEYSPACE = "ReplicaPlanIteratorTest";
+ private static Keyspace keyspace;
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+ keyspace = Keyspace.open(KEYSPACE);
+
+ new TokenUpdater().withKeys(InetAddressAndPort.getByName("127.0.0.1"), "1")
+ .withKeys(InetAddressAndPort.getByName("127.0.0.6"), "6")
+ .update();
+ }
+
+ @Test
+ public void testRanges()
+ {
+ // no splits
+ testRanges(range(rp("2"), rp("5")), range(rp("2"), rp("5")));
+ testRanges(bounds(rp("2"), rp("5")), bounds(rp("2"), rp("5")));
+ testRanges(exBounds(rp("2"), rp("5")), exBounds(rp("2"), rp("5")));
+ // single split testGRR(range("2", "7"), range(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
+ testRanges(bounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), range(endOf("6"), rp("7")));
+ testRanges(exBounds(rp("2"), rp("7")), range(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
+ testRanges(incExBounds(rp("2"), rp("7")), bounds(rp("2"), endOf("6")), exBounds(endOf("6"), rp("7")));
+ // single split starting from min
+ testRanges(range(rp(""), rp("2")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+ testRanges(bounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+ testRanges(exBounds(rp(""), rp("2")), range(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
+ testRanges(incExBounds(rp(""), rp("2")), bounds(rp(""), endOf("1")), exBounds(endOf("1"), rp("2")));
+ // single split ending with max
+ testRanges(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+ testRanges(bounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+ testRanges(exBounds(rp("5"), rp("")), range(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
+ testRanges(incExBounds(rp("5"), rp("")), bounds(rp("5"), endOf("6")), exBounds(endOf("6"), rp("")));
+ // two splits
+ testRanges(range(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
+ testRanges(bounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("7")));
+ testRanges(exBounds(rp("0"), rp("7")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
+ testRanges(incExBounds(rp("0"), rp("7")), bounds(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("7")));
+ }
+
+ @Test
+ public void testExact()
+ {
+ // min
+ testRanges(range(endOf("1"), endOf("5")), range(endOf("1"), endOf("5")));
+ testRanges(range(rp("1"), endOf("5")), range(rp("1"), endOf("1")), range(endOf("1"), endOf("5")));
+ testRanges(bounds(startOf("1"), endOf("5")), bounds(startOf("1"), endOf("1")), range(endOf("1"), endOf("5")));
+ testRanges(exBounds(endOf("1"), rp("5")), exBounds(endOf("1"), rp("5")));
+ testRanges(exBounds(rp("1"), rp("5")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
+ testRanges(exBounds(startOf("1"), endOf("5")), range(startOf("1"), endOf("1")), exBounds(endOf("1"), endOf("5")));
+ testRanges(incExBounds(rp("1"), rp("5")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("5")));
+ // max
+ testRanges(range(endOf("2"), endOf("6")), range(endOf("2"), endOf("6")));
+ testRanges(bounds(startOf("2"), endOf("6")), bounds(startOf("2"), endOf("6")));
+ testRanges(exBounds(rp("2"), rp("6")), exBounds(rp("2"), rp("6")));
+ testRanges(incExBounds(rp("2"), rp("6")), incExBounds(rp("2"), rp("6")));
+ // bothKeys
+ testRanges(range(rp("1"), rp("6")), range(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
+ testRanges(bounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), range(endOf("1"), rp("6")));
+ testRanges(exBounds(rp("1"), rp("6")), range(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
+ testRanges(incExBounds(rp("1"), rp("6")), bounds(rp("1"), endOf("1")), exBounds(endOf("1"), rp("6")));
+ }
+
+ @Test
+ public void testWrapped()
+ {
+ // one token in wrapped range
+ testRanges(range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
+ // two tokens in wrapped range
+ testRanges(range(rp("5"), rp("0")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
+ testRanges(range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("2")));
+ // full wraps
+ testRanges(range(rp("0"), rp("0")), range(rp("0"), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("0")));
+ testRanges(range(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
+ // wrap on member tokens
+ testRanges(range(rp("6"), rp("6")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), endOf("1")), range(endOf("1"), rp("6")));
+ testRanges(range(rp("6"), rp("1")), range(rp("6"), endOf("6")), range(endOf("6"), rp("")), range(rp(""), rp("1")));
+ // end wrapped
+ testRanges(range(rp("5"), rp("")), range(rp("5"), endOf("6")), range(endOf("6"), rp("")));
+ }
+
+ @Test
+ public void testExactBounds()
+ {
+ // equal tokens are special cased as non-wrapping for bounds
+ testRanges(bounds(rp("0"), rp("0")), bounds(rp("0"), rp("0")));
+ // completely empty bounds match everything
+ testRanges(bounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), range(endOf("6"), rp("")));
+ testRanges(exBounds(rp(""), rp("")), range(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
+ testRanges(incExBounds(rp(""), rp("")), bounds(rp(""), endOf("1")), range(endOf("1"), endOf("6")), exBounds(endOf("6"), rp("")));
+ }
+
+ @Test
+ public void testLocalReplicationStrategy()
+ {
+ Keyspace systemKeyspace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+
+ // ranges that would produce multiple splits with not-local strategy, but not with local strategy
+ testRanges(systemKeyspace, range(rp("0"), rp("7")), range(rp("0"), rp("7")));
+ testRanges(systemKeyspace, bounds(rp("0"), rp("7")), bounds(rp("0"), rp("7")));
+ testRanges(systemKeyspace, exBounds(rp("0"), rp("7")), exBounds(rp("0"), rp("7")));
+ testRanges(systemKeyspace, incExBounds(rp("0"), rp("7")), incExBounds(rp("0"), rp("7")));
+
+ // wrapping ranges that should be unwrapped but not further splitted
+ testRanges(systemKeyspace, range(rp("7"), rp("0")), range(rp("7"), rp("")), range(rp(""), rp("0")));
+ testRanges(systemKeyspace, range(rp("7"), rp("2")), range(rp("7"), rp("")), range(rp(""), rp("2")));
+ }
+
+ @SafeVarargs
+ private final void testRanges(AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
+ {
+ testRanges(keyspace, queryRange, expected);
+ }
+
+ @SafeVarargs
+ private final void testRanges(Keyspace keyspace, AbstractBounds<PartitionPosition> queryRange, AbstractBounds<PartitionPosition>... expected)
+ {
+ try (ReplicaPlanIterator iterator = new ReplicaPlanIterator(queryRange, keyspace, ConsistencyLevel.ANY))
+ {
+ List<AbstractBounds<PartitionPosition>> restrictedRanges = new ArrayList<>(expected.length);
+ while (iterator.hasNext())
+ restrictedRanges.add(iterator.next().range());
+
+ // verify range counts
+ assertEquals(expected.length, restrictedRanges.size());
+ assertEquals(expected.length, iterator.size());
+
+ // verify the ranges
+ for (int i = 0; i < expected.length; i++)
+ assertEquals("Mismatch for index " + i + ": " + restrictedRanges, expected[i], restrictedRanges.get(i));
+ }
+ }
+
+ private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
+ {
+ return new Range<>(left, right);
+ }
+
+ private static Bounds<PartitionPosition> bounds(PartitionPosition left, PartitionPosition right)
+ {
+ return new Bounds<>(left, right);
+ }
+
+ private static ExcludingBounds<PartitionPosition> exBounds(PartitionPosition left, PartitionPosition right)
+ {
+ return new ExcludingBounds<>(left, right);
+ }
+
+ private static IncludingExcludingBounds<PartitionPosition> incExBounds(PartitionPosition left, PartitionPosition right)
+ {
+ return new IncludingExcludingBounds<>(left, right);
+ }
+
+ private static PartitionPosition startOf(String key)
+ {
+ return token(key).minKeyBound();
+ }
+
+ private static PartitionPosition endOf(String key)
+ {
+ return token(key).maxKeyBound();
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java
new file mode 100644
index 0000000..721551d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/ReplicaPlanMergerTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.Util.testPartitioner;
+import static org.apache.cassandra.db.ConsistencyLevel.ALL;
+import static org.apache.cassandra.db.ConsistencyLevel.ANY;
+import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_SERIAL;
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
+import static org.apache.cassandra.db.ConsistencyLevel.THREE;
+import static org.apache.cassandra.db.ConsistencyLevel.TWO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for {@link ReplicaPlanMerger}.
+ */
+public class ReplicaPlanMergerTest
+{
+ private static final String KEYSPACE = "ReplicaPlanMergerTest";
+ private static Keyspace keyspace;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(2));
+ keyspace = Keyspace.open(KEYSPACE);
+ }
+
+ /**
+ * Tests range merging with a single node cluster and a read consistency level that allows to merge ranges.
+ */
+ @Test
+ public void testSingleNode()
+ {
+ new TokenUpdater().withTokens(10, 20, 30, 40).update();
+
+ // with CLs requiring a single node all ranges are merged (unless they are wrapping)
+ for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE, ANY))
+ {
+ testRanges(cl, range(min(), min()));
+ testRanges(cl, range(min(), max(25)));
+ testRanges(cl, range(min(), max(40)));
+ testRanges(cl, range(min(), max(50)));
+ testRanges(cl, range(max(20), max(30)));
+ testRanges(cl, range(max(25), min()));
+ testRanges(cl, range(max(25), max(35)));
+ testRanges(cl, range(max(50), min()));
+ testRanges(cl, range(max(40), max(10)), range(max(40), min()), range(min(), max(10))); // wrapped is split
+ testRanges(cl, range(max(25), max(15)), range(max(25), min()), range(min(), max(15))); // wrapped is split
+ }
+
+ // with CLs requiring more than a single node ranges are not merged due to the RF=2
+ for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, THREE, SERIAL, LOCAL_SERIAL))
+ {
+ testRanges(cl,
+ range(min(), min()),
+ range(min(), max(10)),
+ range(max(10), max(20)),
+ range(max(20), max(30)),
+ range(max(30), max(40)),
+ range(max(40), min()));
+ testRanges(cl,
+ range(min(), max(25)),
+ range(min(), max(10)), range(max(10), max(20)), range(max(20), max(25)));
+ testRanges(cl,
+ range(min(), max(40)),
+ range(min(), max(10)), range(max(10), max(20)), range(max(20), max(30)), range(max(30), max(40)));
+ testRanges(cl,
+ range(min(), max(50)),
+ range(min(), max(10)),
+ range(max(10), max(20)),
+ range(max(20), max(30)),
+ range(max(30), max(40)),
+ range(max(40), max(50)));
+ testRanges(cl,
+ range(max(20), max(30)));
+ testRanges(cl,
+ range(max(25), min()),
+ range(max(25), max(30)), range(max(30), max(40)), range(max(40), min()));
+ testRanges(cl,
+ range(max(50), min()));
+ testRanges(cl,
+ range(max(30), max(10)), // wrapped
+ range(max(30), max(40)), range(max(40), min()), range(min(), max(10)));
+ testRanges(cl,
+ range(max(25), max(15)), // wrapped
+ range(max(25), max(30)),
+ range(max(30), max(40)),
+ range(max(40), min()),
+ range(min(), max(10)),
+ range(max(10), max(15)));
+ }
+ }
+
+ /**
+ * Tests range merging with a multinode cluster when the token ranges don't overlap between replicas.
+ */
+ @Test
+ public void testMultiNodeWithContinuousRanges()
+ {
+ new TokenUpdater().withTokens("127.0.0.1", 10, 20, 30)
+ .withTokens("127.0.0.2", 40, 50, 60)
+ .withTokens("127.0.0.3", 70, 80, 90)
+ .update();
+
+ // with CL=ANY the ranges are fully merged (unless they are wrapping)
+ testMultiNodeFullMerge(ANY);
+
+ // with CL=THREE the ranges are not merged at all
+ testMultiNodeNoMerge(THREE);
+
+ // with CLs requiring a single node the ranges are merged in a per-node basis
+ for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE))
+ {
+ testRanges(cl,
+ range(min(), min()),
+ range(min(), max(60)), range(max(60), min()));
+ testRanges(cl,
+ range(min(), max(25)));
+ testRanges(cl,
+ range(min(), max(40)));
+ testRanges(cl,
+ range(min(), max(50)));
+ testRanges(cl,
+ range(max(20), max(30)));
+ testRanges(cl,
+ range(max(25), min()),
+ range(max(25), max(60)), range(max(60), min()));
+ testRanges(cl,
+ range(max(25), max(35)),
+ range(max(25), max(35)));
+ testRanges(cl,
+ range(max(50), min()),
+ range(max(50), max(90)), range(max(90), min()));
+ testRanges(cl,
+ range(max(50), max(10)), // wrapping range
+ range(max(50), max(90)), range(max(90), min()), range(min(), max(10)));
+ testRanges(cl,
+ range(max(25), max(15)), // wrapping range
+ range(max(25), max(60)), range(max(60), min()), range(min(), max(15)));
+ }
+
+ // with other CLs the ranges are merged in a similar per-node basis
+ for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, SERIAL, LOCAL_SERIAL))
+ {
+ testRanges(cl,
+ range(min(), min()),
+ range(min(), max(30)), range(max(30), max(60)), range(max(60), max(90)), range(max(90), min()));
+ testRanges(cl,
+ range(min(), max(25)));
+ testRanges(cl,
+ range(min(), max(40)),
+ range(min(), max(30)), range(max(30), max(40)));
+ testRanges(cl,
+ range(min(), max(50)),
+ range(min(), max(30)), range(max(30), max(50)));
+ testRanges(cl,
+ range(max(20), max(30)));
+ testRanges(cl,
+ range(max(25), min()),
+ range(max(25), max(30)), range(max(30), max(60)), range(max(60), max(90)), range(max(90), min()));
+ testRanges(cl,
+ range(max(25), max(35)),
+ range(max(25), max(30)), range(max(30), max(35)));
+ testRanges(cl,
+ range(max(50), min()),
+ range(max(50), max(60)), range(max(60), max(90)), range(max(90), min()));
+ testRanges(cl,
+ range(max(50), max(10)), // wrapping range
+ range(max(50), max(60)), range(max(60), max(90)), range(max(90), min()), range(min(), max(10)));
+ testRanges(cl,
+ range(max(25), max(15)), // wrapping range
+ range(max(25), max(30)),
+ range(max(30), max(60)),
+ range(max(60), max(90)),
+ range(max(90), min()),
+ range(min(), max(15)));
+ }
+ }
+
+ /**
+ * Tests range merging with a multinode cluster when the token ranges overlap between replicas.
+ */
+ @Test
+ public void testMultiNodeWithDiscontinuousRanges()
+ {
+ new TokenUpdater().withTokens("127.0.0.1", 10, 40, 70)
+ .withTokens("127.0.0.2", 20, 50, 80)
+ .withTokens("127.0.0.3", 30, 60, 90)
+ .update();
+
+ // with CL=ANY the ranges are fully merged (unless they are wrapping)
+ testMultiNodeFullMerge(ANY);
+
+ // with CLs requiring a single node the ranges are merged in a per-node basis
+ for (ConsistencyLevel cl : Arrays.asList(ONE, LOCAL_ONE))
+ {
+ testRanges(cl,
+ range(min(), min()), // full range
+ range(min(), max(20)),
+ range(max(20), max(40)),
+ range(max(40), max(60)),
+ range(max(60), max(80)),
+ range(max(80), min()));
+ testRanges(cl,
+ range(min(), max(25)),
+ range(min(), max(20)), range(max(20), max(25)));
+ testRanges(cl,
+ range(min(), max(40)),
+ range(min(), max(20)), range(max(20), max(40)));
+ testRanges(cl,
+ range(min(), max(50)),
+ range(min(), max(20)), range(max(20), max(40)), range(max(40), max(50)));
+ testRanges(cl,
+ range(max(20), max(30)));
+ testRanges(cl,
+ range(max(25), min()),
+ range(max(25), max(40)), range(max(40), max(60)), range(max(60), max(80)), range(max(80), min()));
+ testRanges(cl,
+ range(max(25), max(35)));
+ testRanges(cl,
+ range(max(50), min()),
+ range(max(50), max(70)), range(max(70), max(90)), range(max(90), min()));
+ testRanges(cl,
+ range(max(50), max(10)), // wrapping range
+ range(max(50), max(70)), range(max(70), max(90)), range(max(90), min()), range(min(), max(10)));
+ testRanges(cl,
+ range(max(25), max(15)), // wrapping range
+ range(max(25), max(40)),
+ range(max(40), max(60)),
+ range(max(60), max(80)),
+ range(max(80), min()),
+ range(min(), max(15)));
+ }
+
+ // with other CLs the ranges are not merged at all
+ for (ConsistencyLevel cl : Arrays.asList(ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM, TWO, THREE, SERIAL, LOCAL_SERIAL))
+ {
+ testMultiNodeNoMerge(cl);
+ }
+ }
+
+ private void testMultiNodeFullMerge(ConsistencyLevel cl)
+ {
+ testRanges(cl, range(min(), min()));
+ testRanges(cl, range(min(), max(25)));
+ testRanges(cl, range(min(), max(40)));
+ testRanges(cl, range(min(), max(50)));
+ testRanges(cl, range(max(20), max(30)));
+ testRanges(cl, range(max(25), min()));
+ testRanges(cl, range(max(25), max(35)));
+ testRanges(cl, range(max(50), min()));
+ testRanges(cl, range(max(50), max(10)), range(max(50), min()), range(min(), max(10))); // wrapping range
+ testRanges(cl, range(max(25), max(15)), range(max(25), min()), range(min(), max(15))); // wrapping range
+ }
+
+ private void testMultiNodeNoMerge(ConsistencyLevel cl)
+ {
+ testRanges(cl,
+ range(min(), min()),
+ range(min(), max(10)),
+ range(max(10), max(20)),
+ range(max(20), max(30)),
+ range(max(30), max(40)),
+ range(max(40), max(50)),
+ range(max(50), max(60)),
+ range(max(60), max(70)),
+ range(max(70), max(80)),
+ range(max(80), max(90)),
+ range(max(90), min()));
+ testRanges(cl,
+ range(min(), max(25)),
+ range(min(), max(10)), range(max(10), max(20)), range(max(20), max(25)));
+ testRanges(cl,
+ range(min(), max(40)),
+ range(min(), max(10)), range(max(10), max(20)), range(max(20), max(30)), range(max(30), max(40)));
+ testRanges(cl,
+ range(min(), max(50)),
+ range(min(), max(10)),
+ range(max(10), max(20)),
+ range(max(20), max(30)),
+ range(max(30), max(40)),
+ range(max(40), max(50)));
+ testRanges(cl,
+ range(max(20), max(30)));
+ testRanges(cl,
+ range(max(25), min()),
+ range(max(25), max(30)),
+ range(max(30), max(40)),
+ range(max(40), max(50)),
+ range(max(50), max(60)),
+ range(max(60), max(70)),
+ range(max(70), max(80)),
+ range(max(80), max(90)),
+ range(max(90), min()));
+ testRanges(cl,
+ range(max(25), max(35)),
+ range(max(25), max(30)), range(max(30), max(35)));
+ testRanges(cl,
+ range(max(50), min()),
+ range(max(50), max(60)),
+ range(max(60), max(70)),
+ range(max(70), max(80)),
+ range(max(80), max(90)),
+ range(max(90), min()));
+ testRanges(cl,
+ range(max(50), max(10)), // wrapping range
+ range(max(50), max(60)),
+ range(max(60), max(70)),
+ range(max(70), max(80)),
+ range(max(80), max(90)),
+ range(max(90), min()),
+ range(min(), max(10)));
+ testRanges(cl,
+ range(max(25), max(15)), // wrapping range
+ range(max(25), max(30)),
+ range(max(30), max(40)),
+ range(max(40), max(50)),
+ range(max(50), max(60)),
+ range(max(60), max(70)),
+ range(max(70), max(80)),
+ range(max(80), max(90)),
+ range(max(90), min()),
+ range(min(), max(10)),
+ range(max(10), max(15)));
+ }
+
+ private static PartitionPosition min()
+ {
+ return testPartitioner().getMinimumToken().minKeyBound();
+ }
+
+ private static PartitionPosition max(int key)
+ {
+ return new Murmur3Partitioner.LongToken(key).maxKeyBound();
+ }
+
+ private static Range<PartitionPosition> range(PartitionPosition left, PartitionPosition right)
+ {
+ return new Range<>(left, right);
+ }
+
+ private void testRanges(ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> queryRange)
+ {
+ testRanges(consistencyLevel, queryRange, queryRange);
+ }
+
+ @SafeVarargs
+ private final void testRanges(ConsistencyLevel consistencyLevel,
+ AbstractBounds<PartitionPosition> queryRange,
+ AbstractBounds<PartitionPosition>... expected)
+ {
+ try (ReplicaPlanIterator originals = new ReplicaPlanIterator(queryRange, keyspace, ANY); // ANY avoids endpoint erros
+ ReplicaPlanMerger merger = new ReplicaPlanMerger(originals, keyspace, consistencyLevel))
+ {
+ // collect the merged ranges
+ List<AbstractBounds<PartitionPosition>> mergedRanges = new ArrayList<>(expected.length);
+ while (merger.hasNext())
+ mergedRanges.add(merger.next().range());
+
+ assertFalse("The number of merged ranges should never be greater than the number of original ranges",
+ mergedRanges.size() > originals.size());
+
+ // verify the merged ranges
+ assertEquals(expected.length, mergedRanges.size());
+ for (int i = 0; i < expected.length; i++)
+ assertEquals("Mismatch for index " + i + ": " + mergedRanges, expected[i], mergedRanges.get(i));
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java b/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java
new file mode 100644
index 0000000..7419fbb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/range/TokenUpdater.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Test utility class to set the partitioning tokens in the cluster.
+ *
+ * The per-endpoint tokens to be set can be specified with the {@code withTokens} and {@code withKeys} methods.
+ * The {@link #update()} method will apply the changes, cleaning the previous token metadata info. It will also
+ * initialize Gossip in all the endpoints but the local node.
+ */
+public class TokenUpdater
+{
+ private final Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create();
+
+ public TokenUpdater withTokens(long... values)
+ {
+ return withTokens(localEndpoint(), values);
+ }
+
+ public TokenUpdater withTokens(String endpoint, long... values)
+ {
+ return withTokens(endpointByName(endpoint), values);
+ }
+
+ public TokenUpdater withTokens(InetAddressAndPort endpoint, long... values)
+ {
+ for (long val : values)
+ endpointTokens.put(endpoint, new Murmur3Partitioner.LongToken(val));
+ return this;
+ }
+
+ public TokenUpdater withKeys(int... keys)
+ {
+ return withKeys(localEndpoint(), keys);
+ }
+
+ public TokenUpdater withKeys(String endpoint, int... values)
+ {
+ return withKeys(endpointByName(endpoint), values);
+ }
+
+ public TokenUpdater withKeys(InetAddressAndPort endpoint, int... keys)
+ {
+ for (int key : keys)
+ endpointTokens.put(endpoint, Util.token(key));
+ return this;
+ }
+
+ public TokenUpdater withKeys(String... keys)
+ {
+ return withKeys(localEndpoint(), keys);
+ }
+
+ public TokenUpdater withKeys(InetAddressAndPort endpoint, String... keys)
+ {
+ for (String key : keys)
+ endpointTokens.put(endpoint, Util.token(key));
+ return this;
+ }
+
+ public TokenUpdater update()
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ tmd.updateNormalTokens(endpointTokens);
+ endpointTokens.keySet()
+ .stream()
+ .filter(e -> !e.equals(localEndpoint()))
+ .forEach(e -> Gossiper.instance.initializeNodeUnsafe(e, UUID.randomUUID(), 1));
+ return this;
+ }
+
+ public List<Token> getTokens()
+ {
+ return getTokens(localEndpoint());
+ }
+
+ public List<Token> getTokens(InetAddressAndPort endpoint)
+ {
+ return ImmutableList.copyOf(endpointTokens.get(endpoint));
+ }
+
+ private static InetAddressAndPort localEndpoint()
+ {
+ return FBUtilities.getBroadcastAddressAndPort();
+ }
+
+ private static InetAddressAndPort endpointByName(String name)
+ {
+ try
+ {
+ return InetAddressAndPort.getByName(name);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org