You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/05/31 16:12:48 UTC

[GitHub] [cassandra] jasonstack opened a new pull request #606: Cassandra 15752 trunk

jasonstack opened a new pull request #606:
URL: https://github.com/apache/cassandra/pull/606


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on pull request #606:
URL: https://github.com/apache/cassandra/pull/606#issuecomment-643114950


   rebased with latest trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
adelapena commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439404645



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2173,13 +2200,16 @@ private PartitionIterator sendNextRequests()
 
             try
             {
-                for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+                for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
                 {
+                    ReplicaPlan.ForRangeRead range = ranges.next();
+
                     @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
-                    SingleRangeResponse response = query(ranges.next(), i == 0);
+                    SingleRangeResponse response = query(range, i == 0);
                     concurrentQueries.add(response);
                     readRepairs.add(response.readRepair);
-                    ++rangesQueried;
+                    rangesQueried += range.vnodeCount();
+                    i += range.vnodeCount();

Review comment:
       Doesn't seem that over-fetching is going to be a problem, but we might add a comment about it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439082617



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.

Review comment:
       `liveReturned` appears to be zero if you assert on it here, which makes sense, given we don't get past the first batch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439519699



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2210,6 +2241,18 @@ public void close()
                 Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
             }
         }
+
+        @VisibleForTesting
+        public int rangesQueried()
+        {
+            return rangesQueried;
+        }
+
+        @VisibleForTesting
+        public int batchRequested()

Review comment:
       ```suggestion
           public int batchesRequested()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439062577



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());
+
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(tokens.size() + 1, data.rangesQueried());

Review comment:
       It might be helpful to remind the future reader why there's a +1 here ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
adelapena commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r437334389



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2100,22 +2112,33 @@ public RowIterator computeNext()
             }
         }
 
-        private void updateConcurrencyFactor()
+        @VisibleForTesting
+        public void handleBatchCompleted()

Review comment:
       +1 to place update of `liveReturned` here. AFIK this method doesn't do anything else than updating the concurrency factor, and we don't have any alternative implementations, so I think I'd prefer the old name for this method, `updateConcurrencyFactor`. As for the visibility change, I don't see where is it used in testing.

##########
File path: src/java/org/apache/cassandra/locator/ReplicaPlan.java
##########
@@ -106,18 +106,25 @@ ForTokenRead withContact(EndpointsForToken newContact)
     public static class ForRangeRead extends ForRead<EndpointsForRange>
     {
         final AbstractBounds<PartitionPosition> range;
+        final int rangeCount;
 
-        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount)

Review comment:
       Nit: We could break this line
   ```suggestion
           public ForRangeRead(Keyspace keyspace,
                               ConsistencyLevel consistencyLevel,
                               AbstractBounds<PartitionPosition> range,
                               EndpointsForRange candidates,
                               EndpointsForRange contact,
                               int rangeCount)
   ```

##########
File path: src/java/org/apache/cassandra/locator/ReplicaPlan.java
##########
@@ -106,18 +106,25 @@ ForTokenRead withContact(EndpointsForToken newContact)
     public static class ForRangeRead extends ForRead<EndpointsForRange>
     {
         final AbstractBounds<PartitionPosition> range;
+        final int rangeCount;
 
-        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount)
         {
             super(keyspace, consistencyLevel, candidates, contact);
             this.range = range;
+            this.rangeCount = rangeCount;
         }
 
         public AbstractBounds<PartitionPosition> range() { return range; }
 
+        /**
+         * @return number of vnode ranges

Review comment:
       Perhaps we could extend this to `number of vnode ranges intersected by the range`, or rename the method to `vnodesCount`/`subrangeCount`, or something like that. I think that having a singular `range` and a range count at the same time might be a bit confusing to unaware readers.

##########
File path: test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
##########
@@ -294,7 +294,7 @@ public void setUp()
     static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets)
     {
         return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
-                ReplicaUtils.FULL_BOUNDS, replicas, targets);
+                ReplicaUtils.FULL_BOUNDS, replicas, targets, 1);

Review comment:
       Nit: I think we don't need to break this line

##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest

Review comment:
       I really like the comments here 👍 

##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2041,16 +2052,18 @@ public void close()
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
+        private int maxConcurrencyFactor;
         private int concurrencyFactor;
         // The two following "metric" are maintained to improve the concurrencyFactor
         // when it was not good enough initially.
         private int liveReturned;
         private int rangesQueried;
 
-        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
+        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, int maxConcurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)

Review comment:
       Nit: we could break this line
   ```suggestion
           public RangeCommandIterator(RangeIterator ranges,
                                       PartitionRangeReadCommand command,
                                       int concurrencyFactor,
                                       int maxConcurrencyFactor,
                                       Keyspace keyspace,
                                       ConsistencyLevel consistency,
                                       long queryStartNanoTime)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439050524



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2100,22 +2112,33 @@ public RowIterator computeNext()
             }
         }
 
-        private void updateConcurrencyFactor()
+        @VisibleForTesting
+        public void handleBatchCompleted()

Review comment:
       I mulled around the idea of having a class whose only responsibility was tracking the command's completion state. In that case, you could imagine a method updating it in a general sense, taking the number of live rows returned and ranges queried in a round and automatically updating the concurrency factor (which you would just access to make the next batch). It would be pretty easy to test and make `RangeCommandIterator` a little more focused, but the static `computeConcurrencyFactor()` basically gets us the same thing.
   
   tl;dr I have no problems with the current structure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r437370196



##########
File path: src/java/org/apache/cassandra/locator/ReplicaPlan.java
##########
@@ -106,18 +106,25 @@ ForTokenRead withContact(EndpointsForToken newContact)
     public static class ForRangeRead extends ForRead<EndpointsForRange>
     {
         final AbstractBounds<PartitionPosition> range;
+        final int rangeCount;
 
-        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount)

Review comment:
       +1

##########
File path: src/java/org/apache/cassandra/locator/ReplicaPlan.java
##########
@@ -106,18 +106,25 @@ ForTokenRead withContact(EndpointsForToken newContact)
     public static class ForRangeRead extends ForRead<EndpointsForRange>
     {
         final AbstractBounds<PartitionPosition> range;
+        final int rangeCount;
 
-        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount)
         {
             super(keyspace, consistencyLevel, candidates, contact);
             this.range = range;
+            this.rangeCount = rangeCount;
         }
 
         public AbstractBounds<PartitionPosition> range() { return range; }
 
+        /**
+         * @return number of vnode ranges

Review comment:
       good idea. updated the javadoc and method name to `vnodeCount`

##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2100,22 +2112,33 @@ public RowIterator computeNext()
             }
         }
 
-        private void updateConcurrencyFactor()
+        @VisibleForTesting
+        public void handleBatchCompleted()

Review comment:
       renamed back to `updateConcurrencyFactor `

##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2041,16 +2052,18 @@ public void close()
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
+        private int maxConcurrencyFactor;
         private int concurrencyFactor;
         // The two following "metric" are maintained to improve the concurrencyFactor
         // when it was not good enough initially.
         private int liveReturned;
         private int rangesQueried;
 
-        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
+        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, int maxConcurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)

Review comment:
       +1

##########
File path: test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
##########
@@ -294,7 +294,7 @@ public void setUp()
     static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets)
     {
         return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
-                ReplicaUtils.FULL_BOUNDS, replicas, targets);
+                ReplicaUtils.FULL_BOUNDS, replicas, targets, 1);

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439250979



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.

Review comment:
       right.. it won't update CF if iteration ends.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439056911



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()

Review comment:
       nit: This could probably be the first resident of a new `RangeMergerTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439057659



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());
+
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(tokens.size() + 1, data.rangesQueried());
+    }
+
+    private List<Token> updateTokens(List<Integer> values)

Review comment:
       nit: Maybe `setTokens()`, given it clears the existing stuff?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439236008



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());

Review comment:
       > the reason we still get 5 from data.rangesQueried() is just that we allow overflow from the last ForRangeRead in sendNextRequests()
   
   correct..coordinator needs to track num of vnodes queried. The test will merge ranges.
   
   I will add some more tests and fix the test comments..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439227613



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());
+
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(tokens.size() + 1, data.rangesQueried());

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439232621



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2173,13 +2200,16 @@ private PartitionIterator sendNextRequests()
 
             try
             {
-                for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+                for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
                 {
+                    ReplicaPlan.ForRangeRead range = ranges.next();
+
                     @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
-                    SingleRangeResponse response = query(ranges.next(), i == 0);
+                    SingleRangeResponse response = query(range, i == 0);
                     concurrentQueries.add(response);
                     readRepairs.add(response.readRepair);
-                    ++rangesQueried;
+                    rangesQueried += range.vnodeCount();
+                    i += range.vnodeCount();

Review comment:
       > Say we have a concurrency factor of 2, and the next range actually is a merged range representing 3 vnodes. We'll actually exceed the concurrency factor by 1
   
   this is inevitable, unless coordinator defers the range merging until it knows how many ranges it needs for next batch. I think fetching more ranges in one replica read command isn't going to be very costly, but the cost of under-fetching is significantly higher.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439080599



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());

Review comment:
       @jasonstack I traced through this test and it actually looks like we *are* merging all 5 ranges into one `ForRangeRead`, and the reason we still get 5 from `data.rangesQueried()` is just that we allow overflow from the _last_ `ForRangeRead` in `sendNextRequests()`. It feels like perhaps a couple more tests around `sendNextRequests()` would be helpful? If we parameterize `RangeCommandIterator` to _optionally_ merge ranges, this would be easier. (It could even make the signature of the `RangeCommandIterator` constructor less busy, given `keyspace` and `consistency` are actually only used by the `RangeMerger` created inside it.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439068403



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2173,13 +2200,16 @@ private PartitionIterator sendNextRequests()
 
             try
             {
-                for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+                for (int i = 0; i < concurrencyFactor && ranges.hasNext();)
                 {
+                    ReplicaPlan.ForRangeRead range = ranges.next();
+
                     @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below
-                    SingleRangeResponse response = query(ranges.next(), i == 0);
+                    SingleRangeResponse response = query(range, i == 0);
                     concurrentQueries.add(response);
                     readRepairs.add(response.readRepair);
-                    ++rangesQueried;
+                    rangesQueried += range.vnodeCount();
+                    i += range.vnodeCount();

Review comment:
       @jasonstack @adelapena Say we have a concurrency factor of 2, and the next range actually is a merged range representing 3 vnodes. We'll actually exceed the concurrency factor by 1, but is the idea that we would otherwise not be able to make progress? It doesn't feel like it matters much either way, given the point of this whole mechanism is to limit queries to replicas, which it still does, but I wanted to make sure we're on the same page about the intent...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
adelapena commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439400748



##########
File path: test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
##########
@@ -1321,9 +1322,10 @@ private void assertRepairMetadata(Mutation mutation)
         assertEquals(update.metadata().name, cfm.name);
     }
 
+    @VisibleForTesting

Review comment:
       Do we need this annotation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439445854



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()

Review comment:
       this method needs `setTokens()`.. we can probably move it when refactoring `RangeReadExecutor` out of `StorageProxy`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439225758



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439445263



##########
File path: test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
##########
@@ -1321,9 +1322,10 @@ private void assertRepairMetadata(Mutation mutation)
         assertEquals(update.metadata().name, cfm.name);
     }
 
+    @VisibleForTesting

Review comment:
       removed..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439226691



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());
+
+        int num = Util.size(data);
+        assertEquals(rows, num);
+        assertEquals(tokens.size() + 1, data.rangesQueried());
+    }
+
+    private List<Token> updateTokens(List<Integer> values)

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439236008



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());

Review comment:
       > the reason we still get 5 from data.rangesQueried() is just that we allow overflow from the last ForRangeRead in sendNextRequests()
   
   correct..coordinator needs to track num of vnodes queried
   
   Renamed `data.rangesQueried()` to `data.vnodeRangesQueried()`.. I will add some more tests and fix the test comments..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439236008



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());

Review comment:
       Renamed `data.rangesQueried()` to `data.vnodeRangesQueried()`.. I will add some more tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439236008



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;
+
+        // no live row returned, fetch all remaining ranges but hit the max instead
+        int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest
+
+        // no live row returned, fetch all remaining ranges
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 80, maxConccurrentRangeRequest, 500, 0);
+        assertEquals(20, cf); // because 100-80 = 20 < maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again but hit the max instead
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 60, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(maxConccurrentRangeRequest, cf); // because 60 > maxConccurrentRangeRequest
+
+        // returned half rows, fetch rangesQueried again
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 480, 240);
+        assertEquals(30, cf); // because 30 < maxConccurrentRangeRequest
+
+        // returned most of rows, 1 more range to fetch
+        cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 1, maxConccurrentRangeRequest, 480, 479);
+        assertEquals(1, cf); // because 1 < maxConccurrentRangeRequest
+    }
+
+    @Test
+    public void testRangeCountWithRangeMerge()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        List<ReplicaPlan.ForRangeRead> ranges = new ArrayList<>();
+        for (int i = 0; i + 1 < tokens.size(); i++)
+        {
+            Range<PartitionPosition> range = Range.makeRowRange(tokens.get(i), tokens.get(i + 1));
+            ranges.add(ReplicaPlans.forRangeRead(keyspace, ConsistencyLevel.ONE, range, 1));
+        }
+
+        StorageProxy.RangeMerger merge = new StorageProxy.RangeMerger(ranges.iterator(), keyspace, ConsistencyLevel.ONE);
+        ReplicaPlan.ForRangeRead mergedRange = Iterators.getOnlyElement(merge);
+        // all ranges are merged as test has only one node.
+        assertEquals(tokens.size() - 1, mergedRange.vnodeCount());
+    }
+
+    @Test
+    public void testRangeQueried()
+    {
+        List<Token> tokens = updateTokens(Arrays.asList(100, 200, 300, 400));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.clearUnsafe();
+
+        int rows = 100;
+        for (int i = 0; i < rows; ++i)
+        {
+            RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata(), 10, String.valueOf(i));
+            builder.clustering("c");
+            builder.add("val", String.valueOf(i));
+            builder.build().applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+
+        PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build();
+        // avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly.
+        StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE);
+        StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime());

Review comment:
       > the reason we still get 5 from data.rangesQueried() is just that we allow overflow from the last ForRangeRead in sendNextRequests()
   
   correct..coordinator needs to track num of vnodes queried
   
   Renamed `data.rangesQueried()` to `data.vnodeRangesQueried()`.. I will add some more tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack closed pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack closed pull request #606:
URL: https://github.com/apache/cassandra/pull/606


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r442155081



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2210,6 +2241,18 @@ public void close()
                 Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
             }
         }
+
+        @VisibleForTesting
+        public int rangesQueried()
+        {
+            return rangesQueried;
+        }
+
+        @VisibleForTesting
+        public int batchRequested()

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r438918585



##########
File path: test/unit/org/apache/cassandra/db/PartitionRangeReadTest.java
##########
@@ -188,5 +201,94 @@ public void testRangeSliceInclusionExclusion() throws Throwable
         assertTrue(partitions.get(0).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("2")));
         assertTrue(partitions.get(partitions.size() - 1).iterator().next().getCell(cDef).value().equals(ByteBufferUtil.bytes("6")));
     }
+
+    @Test
+    public void testComputeConcurrencyFactor()
+    {
+        int maxConccurrentRangeRequest = 32;

Review comment:
       ```suggestion
           int maxConcurrentRangeRequest = 32;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
maedhroz commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439070042



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2041,16 +2051,24 @@ public void close()
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
+        private int maxConcurrencyFactor;

Review comment:
       Should be `final`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #606: Cassandra 15752 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #606:
URL: https://github.com/apache/cassandra/pull/606#discussion_r439232760



##########
File path: src/java/org/apache/cassandra/service/StorageProxy.java
##########
@@ -2041,16 +2051,24 @@ public void close()
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
+        private int maxConcurrencyFactor;

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org