You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/22 15:23:17 UTC
[cassandra] branch cassandra-3.0 updated: Count vnode ranges
towards concurrency factor instead merged ranges and cap max concurrency
factor by core * 10
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new abdf508 Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10
abdf508 is described below
commit abdf5085d4381351054bc2c0976bc826f4ac82e2
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon Jun 22 15:34:22 2020 +0100
Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10
patch by Zhao Yang; reviewed by Andres de la Peña, Caleb Rackliffe for CASSANDRA-15752
---
CHANGES.txt | 1 +
.../org/apache/cassandra/service/StorageProxy.java | 115 +++++++++++++----
.../cassandra/db/PartitionRangeReadTest.java | 143 +++++++++++++++++++++
3 files changed, 235 insertions(+), 24 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d1b1416..dc50ff5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.21
+ * Fixed range read concurrency factor computation and capped as 10 times tpc cores (CASSANDRA-15752)
* Catch exception on bootstrap resume and init native transport (CASSANDRA-15863)
* Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
* Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 19cd901..c7888c4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Predicate;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
import com.google.common.primitives.Ints;
@@ -99,6 +100,15 @@ public class StorageProxy implements StorageProxyMBean
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
+ /**
+ * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
+ * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
+ * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
+ * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
+ * we want to give range queries a chance to run if resources are available.
+ */
+ private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
+
private StorageProxy()
{
}
@@ -1838,21 +1848,33 @@ public class StorageProxy implements StorageProxyMBean
return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
}
- private static class RangeForQuery
+ @VisibleForTesting
+ public static class RangeForQuery
{
public final AbstractBounds<PartitionPosition> range;
public final List<InetAddress> liveEndpoints;
public final List<InetAddress> filteredEndpoints;
+ public final int vnodeCount;
- public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints)
+ public RangeForQuery(AbstractBounds<PartitionPosition> range,
+ List<InetAddress> liveEndpoints,
+ List<InetAddress> filteredEndpoints,
+ int vnodeCount)
{
this.range = range;
this.liveEndpoints = liveEndpoints;
this.filteredEndpoints = filteredEndpoints;
+ this.vnodeCount = vnodeCount;
+ }
+
+ public int vnodeCount()
+ {
+ return vnodeCount;
}
}
- private static class RangeIterator extends AbstractIterator<RangeForQuery>
+ @VisibleForTesting
+ public static class RangeIterator extends AbstractIterator<RangeForQuery>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
@@ -1885,17 +1907,19 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right);
return new RangeForQuery(range,
liveEndpoints,
- consistency.filterForQuery(keyspace, liveEndpoints));
+ consistency.filterForQuery(keyspace, liveEndpoints),
+ 1);
}
}
- private static class RangeMerger extends AbstractIterator<RangeForQuery>
+ @VisibleForTesting
+ public static class RangeMerger extends AbstractIterator<RangeForQuery>
{
private final Keyspace keyspace;
private final ConsistencyLevel consistency;
private final PeekingIterator<RangeForQuery> ranges;
- private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+ public RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
{
this.keyspace = keyspace;
this.consistency = consistency;
@@ -1937,7 +1961,8 @@ public class StorageProxy implements StorageProxyMBean
break;
// If we get there, merge this range and the next one
- current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged);
+ int vnodeCount = current.vnodeCount + next.vnodeCount;
+ current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged, vnodeCount);
ranges.next(); // consume the range we just merged since we've only peeked so far
}
return current;
@@ -1982,7 +2007,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+ public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
{
private final Iterator<RangeForQuery> ranges;
private final int totalRangeCount;
@@ -1995,19 +2020,28 @@ public class StorageProxy implements StorageProxyMBean
private DataLimits.Counter counter;
private PartitionIterator sentQueryIterator;
+ private final int maxConcurrencyFactor;
private int concurrencyFactor;
// The two following "metric" are maintained to improve the concurrencyFactor
// when it was not good enough initially.
private int liveReturned;
private int rangesQueried;
+ private int batchesRequested = 0;
- public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+ public RangeCommandIterator(Iterator<RangeForQuery> ranges,
+ PartitionRangeReadCommand command,
+ int concurrencyFactor,
+ int maxConcurrencyFactor,
+ int totalRangeCount,
+ Keyspace keyspace,
+ ConsistencyLevel consistency)
{
this.command = command;
this.concurrencyFactor = concurrencyFactor;
+ this.maxConcurrencyFactor = maxConcurrencyFactor;
this.startTime = System.nanoTime();
- this.ranges = new RangeMerger(ranges, keyspace, consistency);
- this.totalRangeCount = ranges.rangeCount();
+ this.ranges = ranges;
+ this.totalRangeCount = totalRangeCount;
this.consistency = consistency;
this.keyspace = keyspace;
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
@@ -2026,7 +2060,6 @@ public class StorageProxy implements StorageProxyMBean
// else, sends the next batch of concurrent queries (after having close the previous iterator)
if (sentQueryIterator != null)
{
- liveReturned += counter.counted();
sentQueryIterator.close();
// It's not the first batch of queries and we're not done, so we we can use what has been
@@ -2057,20 +2090,31 @@ public class StorageProxy implements StorageProxyMBean
private void updateConcurrencyFactor()
{
+ liveReturned += counter.counted();
+
+ concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
+ }
+
+ @VisibleForTesting
+ public static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
+ {
+ maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
if (liveReturned == 0)
{
- // we haven't actually gotten any results, so query all remaining ranges at once
- concurrencyFactor = totalRangeCount - rangesQueried;
- return;
+ // we haven't actually gotten any results, so query up to the limit if not results so far
+ Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
+ return maxConcurrencyFactor;
}
// Otherwise, compute how many rows per range we got on average and pick a concurrency factor
// that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
- int remainingRows = command.limits().count() - liveReturned;
+ int remainingRows = limit - liveReturned;
float rowsPerRange = (float)liveReturned / (float)rangesQueried;
- concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
+ int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
- rowsPerRange, (int) remainingRows, concurrencyFactor);
+ rowsPerRange, remainingRows, concurrencyFactor);
+
+ return concurrencyFactor;
}
private SingleRangeResponse query(RangeForQuery toQuery)
@@ -2106,11 +2150,14 @@ public class StorageProxy implements StorageProxyMBean
private PartitionIterator sendNextRequests()
{
List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
- for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+ for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
{
- concurrentQueries.add(query(ranges.next()));
- ++rangesQueried;
+ RangeForQuery range = ranges.next();
+ concurrentQueries.add(query(range));
+ rangesQueried += range.vnodeCount();
+ i += range.vnodeCount();
}
+ batchesRequested++;
Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
// We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
@@ -2133,6 +2180,18 @@ public class StorageProxy implements StorageProxyMBean
Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
}
}
+
+ @VisibleForTesting
+ public int rangesQueried()
+ {
+ return rangesQueried;
+ }
+
+ @VisibleForTesting
+ public int batchesRequested()
+ {
+ return batchesRequested;
+ }
}
@SuppressWarnings("resource")
@@ -2148,16 +2207,24 @@ public class StorageProxy implements StorageProxyMBean
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+ int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS);
int concurrencyFactor = resultsPerRange == 0.0
? 1
- : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
+ : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
// Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
-
- return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)),
+ RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel);
+ RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges,
+ command,
+ concurrencyFactor,
+ maxConcurrencyFactor,
+ ranges.rangeCount(),
+ keyspace,
+ consistencyLevel);
+ return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator),
command.nowInSec(),
command.selectsFullPartition(),
command.metadata().enforceStrictLiveness());
diff --git a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
index 1368705..b567f72 100644
--- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
@@ -19,26 +19,41 @@
package org.apache.cassandra.db;
import java.math.BigInteger;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import com.google.common.collect.Iterators;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.cassandra.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class PartitionRangeReadTest
{
@@ -48,6 +63,8 @@ public class PartitionRangeReadTest
public static final String CF_STANDARDINT = "StandardInteger1";
public static final String CF_COMPACT1 = "Compact1";
+ private static final List<InetAddress> LOCAL = Collections.singletonList(FBUtilities.getBroadcastAddress());
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@ -388,5 +405,131 @@ public class PartitionRangeReadTest
// assertColumnNames(row1, "c1", "c2");
// assertColumnNames(row2, "c1");
// }
+
+ @Test
+ public void testComputeConcurrencyFactor()
+ {
+ int maxConcurrentRangeRequest = 32;
+
+ // no live row returned, fetch all remaining ranges but hit the max instead
+ int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
+ assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+ // no live row returned, fetch all remaining ranges
+ cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
+ assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+ // returned half rows, fetch rangesQueried again but hit the max instead
+ cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
+ assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+ // returned half rows, fetch rangesQueried again
+ cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
+ assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+ // returned most of rows, 1 more range to fetch
+ cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
+ assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+ }
+
+ @Test
+ public void testRangeCountWithRangeMerge()
+ {
+ List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+ int vnodeCount = 0;
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ List<StorageProxy.RangeForQuery> ranges = new ArrayList<>();
+ for (int i = 0; i + 1 < tokens.size(); i++)
+ {
+ Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+ ranges.add(new StorageProxy.RangeForQuery(range, LOCAL, LOCAL, 1));
+ vnodeCount++;
+ }
+
+ StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ONE);
+ StorageProxy.RangeForQuery mergedRange = Iterators.getOnlyElement(merge);
+ // all ranges are merged as test has only one node.
+ assertEquals(vnodeCount, mergedRange.vnodeCount());
+ }
+
+ @Test
+ public void testRangeQueried()
+ {
+ List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+ int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.clearUnsafe();
+
+ int rows = 100;
+ for (int i = 0; i < rows; ++i)
+ {
+ RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 10, String.valueOf(i));
+ builder.clustering("c");
+ builder.add("val", String.valueOf(i));
+ builder.build().applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+
+ PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+
+ // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
+ Iterator<StorageProxy.RangeForQuery> ranges = rangeIterator(command, keyspace, false);
+ StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE);
+ verifyRangeCommandIterator(data, rows, 2, vnodeCount);
+
+ // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
+ ranges = rangeIterator(command, keyspace, false);
+ data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, keyspace, ONE);
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+ // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
+ ranges = rangeIterator(command, keyspace, false);
+ data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE);
+ verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
+
+ // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
+ ranges = rangeIterator(command, keyspace, true);
+ data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE);
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+ // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
+ ranges = rangeIterator(command, keyspace, true);
+ data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE);
+ verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+ }
+
+ private Iterator<StorageProxy.RangeForQuery> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
+ {
+ Iterator<StorageProxy.RangeForQuery> ranges = new StorageProxy.RangeIterator(command, keyspace, ONE);
+ if (withRangeMerger)
+ ranges = new StorageProxy.RangeMerger(ranges, keyspace, ONE);
+
+ return ranges;
+ }
+
+ private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount)
+ {
+ int num = Util.size(data);
+ assertEquals(rows, num);
+ assertEquals(batches, data.batchesRequested());
+ assertEquals(vnodeCount, data.rangesQueried());
+ }
+
+ private List<Token> setTokens(List<Integer> values)
+ {
+ IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+ List<Token> tokens = new ArrayList<>(values.size());
+ for (Integer val : values)
+ tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val)));
+
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+
+ return tokens;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org