You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/22 15:23:17 UTC

[cassandra] branch cassandra-3.0 updated: Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new abdf508  Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10
abdf508 is described below

commit abdf5085d4381351054bc2c0976bc826f4ac82e2
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon Jun 22 15:34:22 2020 +0100

    Count vnode ranges towards concurrency factor instead merged ranges and cap max concurrency factor by core * 10
    
    patch by Zhao Yang; reviewed by Andres de la Peña, Caleb Rackliffe for CASSANDRA-15752
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/service/StorageProxy.java | 115 +++++++++++++----
 .../cassandra/db/PartitionRangeReadTest.java       | 143 +++++++++++++++++++++
 3 files changed, 235 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d1b1416..dc50ff5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Fixed range read concurrency factor computation and capped as 10 times tpc cores (CASSANDRA-15752)
  * Catch exception on bootstrap resume and init native transport (CASSANDRA-15863)
  * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
  * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 19cd901..c7888c4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Predicate;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
@@ -99,6 +100,15 @@ public class StorageProxy implements StorageProxyMBean
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
+    /**
+     * Introduce a maximum number of sub-ranges that the coordinator can request in parallel for range queries. Previously
+     * we would request up to the maximum number of ranges but this causes problems if the number of vnodes is large.
+     * By default we pick 10 requests per core, assuming all replicas have the same number of cores. The idea is that we
+     * don't want a burst of range requests that will back up, hurting all other queries. At the same time,
+     * we want to give range queries a chance to run if resources are available.
+     */
+    private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
+
     private StorageProxy()
     {
     }
@@ -1838,21 +1848,33 @@ public class StorageProxy implements StorageProxyMBean
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
     }
 
-    private static class RangeForQuery
+    @VisibleForTesting
+    public static class RangeForQuery
     {
         public final AbstractBounds<PartitionPosition> range;
         public final List<InetAddress> liveEndpoints;
         public final List<InetAddress> filteredEndpoints;
+        public final int vnodeCount;
 
-        public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints)
+        public RangeForQuery(AbstractBounds<PartitionPosition> range,
+                             List<InetAddress> liveEndpoints,
+                             List<InetAddress> filteredEndpoints,
+                             int vnodeCount)
         {
             this.range = range;
             this.liveEndpoints = liveEndpoints;
             this.filteredEndpoints = filteredEndpoints;
+            this.vnodeCount = vnodeCount;
+        }
+
+        public int vnodeCount()
+        {
+            return vnodeCount;
         }
     }
 
-    private static class RangeIterator extends AbstractIterator<RangeForQuery>
+    @VisibleForTesting
+    public static class RangeIterator extends AbstractIterator<RangeForQuery>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
@@ -1885,17 +1907,19 @@ public class StorageProxy implements StorageProxyMBean
             List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right);
             return new RangeForQuery(range,
                                      liveEndpoints,
-                                     consistency.filterForQuery(keyspace, liveEndpoints));
+                                     consistency.filterForQuery(keyspace, liveEndpoints),
+                                     1);
         }
     }
 
-    private static class RangeMerger extends AbstractIterator<RangeForQuery>
+    @VisibleForTesting
+    public static class RangeMerger extends AbstractIterator<RangeForQuery>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
         private final PeekingIterator<RangeForQuery> ranges;
 
-        private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+        public RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
         {
             this.keyspace = keyspace;
             this.consistency = consistency;
@@ -1937,7 +1961,8 @@ public class StorageProxy implements StorageProxyMBean
                     break;
 
                 // If we get there, merge this range and the next one
-                current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged);
+                int vnodeCount = current.vnodeCount + next.vnodeCount;
+                current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged, vnodeCount);
                 ranges.next(); // consume the range we just merged since we've only peeked so far
             }
             return current;
@@ -1982,7 +2007,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+    public static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
     {
         private final Iterator<RangeForQuery> ranges;
         private final int totalRangeCount;
@@ -1995,19 +2020,28 @@ public class StorageProxy implements StorageProxyMBean
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
+        private final int maxConcurrencyFactor;
         private int concurrencyFactor;
         // The two following "metric" are maintained to improve the concurrencyFactor
         // when it was not good enough initially.
         private int liveReturned;
         private int rangesQueried;
+        private int batchesRequested = 0;
 
-        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+        public RangeCommandIterator(Iterator<RangeForQuery> ranges,
+                                    PartitionRangeReadCommand command,
+                                    int concurrencyFactor,
+                                    int maxConcurrencyFactor,
+                                    int totalRangeCount,
+                                    Keyspace keyspace,
+                                    ConsistencyLevel consistency)
         {
             this.command = command;
             this.concurrencyFactor = concurrencyFactor;
+            this.maxConcurrencyFactor = maxConcurrencyFactor;
             this.startTime = System.nanoTime();
-            this.ranges = new RangeMerger(ranges, keyspace, consistency);
-            this.totalRangeCount = ranges.rangeCount();
+            this.ranges = ranges;
+            this.totalRangeCount = totalRangeCount;
             this.consistency = consistency;
             this.keyspace = keyspace;
             this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
@@ -2026,7 +2060,6 @@ public class StorageProxy implements StorageProxyMBean
                     // else, sends the next batch of concurrent queries (after having close the previous iterator)
                     if (sentQueryIterator != null)
                     {
-                        liveReturned += counter.counted();
                         sentQueryIterator.close();
 
                         // It's not the first batch of queries and we're not done, so we we can use what has been
@@ -2057,20 +2090,31 @@ public class StorageProxy implements StorageProxyMBean
 
         private void updateConcurrencyFactor()
         {
+            liveReturned += counter.counted();
+
+            concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
+        }
+
+        @VisibleForTesting
+        public static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
+        {
+            maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
             if (liveReturned == 0)
             {
-                // we haven't actually gotten any results, so query all remaining ranges at once
-                concurrencyFactor = totalRangeCount - rangesQueried;
-                return;
+                // we haven't actually gotten any results, so query up to the limit if not results so far
+                Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
+                return maxConcurrencyFactor;
             }
 
             // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
             // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
-            int remainingRows = command.limits().count() - liveReturned;
+            int remainingRows = limit - liveReturned;
             float rowsPerRange = (float)liveReturned / (float)rangesQueried;
-            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
+            int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
             logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
-                         rowsPerRange, (int) remainingRows, concurrencyFactor);
+                         rowsPerRange, remainingRows, concurrencyFactor);
+
+            return concurrencyFactor;
         }
 
         private SingleRangeResponse query(RangeForQuery toQuery)
@@ -2106,11 +2150,14 @@ public class StorageProxy implements StorageProxyMBean
         private PartitionIterator sendNextRequests()
         {
             List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
-            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+            for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
             {
-                concurrentQueries.add(query(ranges.next()));
-                ++rangesQueried;
+                RangeForQuery range = ranges.next();
+                concurrentQueries.add(query(range));
+                rangesQueried += range.vnodeCount();
+                i += range.vnodeCount();
             }
+            batchesRequested++;
 
             Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
             // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
@@ -2133,6 +2180,18 @@ public class StorageProxy implements StorageProxyMBean
                 Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
             }
         }
+
+        @VisibleForTesting
+        public int rangesQueried()
+        {
+            return rangesQueried;
+        }
+
+        @VisibleForTesting
+        public int batchesRequested()
+        {
+            return batchesRequested;
+        }
     }
 
     @SuppressWarnings("resource")
@@ -2148,16 +2207,24 @@ public class StorageProxy implements StorageProxyMBean
         // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
         // fetch enough rows in the first round
         resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+        int maxConcurrencyFactor = Math.min(ranges.rangeCount(), MAX_CONCURRENT_RANGE_REQUESTS);
         int concurrencyFactor = resultsPerRange == 0.0
                               ? 1
-                              : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
+                              : Math.max(1, Math.min(maxConcurrencyFactor, (int) Math.ceil(command.limits().count() / resultsPerRange)));
         logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
                      resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
         Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
 
         // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
-
-        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)),
+        RangeMerger mergedRanges = new RangeMerger(ranges, keyspace, consistencyLevel);
+        RangeCommandIterator rangeCommandIterator = new RangeCommandIterator(mergedRanges,
+                                                                             command,
+                                                                             concurrencyFactor,
+                                                                             maxConcurrencyFactor,
+                                                                             ranges.rangeCount(),
+                                                                             keyspace,
+                                                                             consistencyLevel);
+        return command.limits().filter(command.postReconciliationProcessing(rangeCommandIterator),
                                        command.nowInSec(),
                                        command.selectsFullPartition(),
                                        command.metadata().enforceStrictLiveness());
diff --git a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
index 1368705..b567f72 100644
--- a/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
@@ -19,26 +19,41 @@
 package org.apache.cassandra.db;
 
 import java.math.BigInteger;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.Iterators;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.cassandra.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class PartitionRangeReadTest
 {
@@ -48,6 +63,8 @@ public class PartitionRangeReadTest
     public static final String CF_STANDARDINT = "StandardInteger1";
     public static final String CF_COMPACT1 = "Compact1";
 
+    private static final List<InetAddress> LOCAL = Collections.singletonList(FBUtilities.getBroadcastAddress());
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -388,5 +405,131 @@ public class PartitionRangeReadTest
 //        assertColumnNames(row1, "c1", "c2");
 //        assertColumnNames(row2, "c1");
 //    }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConcurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 500, 0);
+        assertEquals(maxConcurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConcurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConcurrentRangeRequest, 480, 240);
+        assertEquals(maxConcurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConcurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConcurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+        int vnodeCount = 0;
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<StorageProxy.RangeForQuery> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(new StorageProxy.RangeForQuery(range, LOCAL, LOCAL, 1));
+            vnodeCount++;
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ONE);
+        StorageProxy.RangeForQuery mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(vnodeCount, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = setTokens(Arrays.asList(100, 200, 300, 400));
+        int vnodeCount = tokens.size() + 1; // n tokens divide token ring into n+1 ranges
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+
+        // without range merger, there will be 2 batches requested: 1st batch with 1 range and 2nd batch with remaining ranges
+        Iterator<StorageProxy.RangeForQuery> ranges = rangeIterator(command, keyspace, false);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE);
+        verifyRangeCommandIterator(data, rows, 2, vnodeCount);
+
+        // without range merger and initial cf=5, there will be 1 batches requested: 5 vnode ranges for 1st batch
+        ranges = rangeIterator(command, keyspace, false);
+        data = new StorageProxy.RangeCommandIterator(ranges, command, vnodeCount, 1000, vnodeCount, keyspace, ONE);
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+        // without range merger and max cf=1, there will be 5 batches requested: 1 vnode range per batch
+        ranges = rangeIterator(command, keyspace, false);
+        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE);
+        verifyRangeCommandIterator(data, rows, vnodeCount, vnodeCount);
+
+        // with range merger, there will be only 1 batch requested, as all ranges share the same replica - localhost
+        ranges = rangeIterator(command, keyspace, true);
+        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1000, vnodeCount, keyspace, ONE);
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+
+        // with range merger and max cf=1, there will be only 1 batch requested, as all ranges share the same replica - localhost
+        ranges = rangeIterator(command, keyspace, true);
+        data = new StorageProxy.RangeCommandIterator(ranges, command, 1, 1, vnodeCount, keyspace, ONE);
+        verifyRangeCommandIterator(data, rows, 1, vnodeCount);
+    }
+
+    private Iterator<StorageProxy.RangeForQuery> rangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, boolean withRangeMerger)
+    {
+        Iterator<StorageProxy.RangeForQuery> ranges = new StorageProxy.RangeIterator(command, keyspace, ONE);
+        if (withRangeMerger)
+            ranges = new StorageProxy.RangeMerger(ranges, keyspace, ONE);
+
+        return  ranges;
+    }
+
+    private void verifyRangeCommandIterator(StorageProxy.RangeCommandIterator data, int rows, int batches, int vnodeCount)
+    {
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(batches, data.batchesRequested());
+        assertEquals(vnodeCount, data.rangesQueried());
+    }
+
+    private List<Token> setTokens(List<Integer> values)
+    {
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        List<Token> tokens = new ArrayList<>(values.size());
+        for (Integer val : values)
+            tokens.add(partitioner.getToken(ByteBufferUtil.bytes(val)));
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        tmd.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+
+        return tokens;
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org