You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/11/12 13:33:02 UTC

[GitHub] [cassandra] adelapena commented on a change in pull request #801: CASSANDRA-16180 Coordination tests

adelapena commented on a change in pull request #801:
URL: https://github.com/apache/cassandra/pull/801#discussion_r522107443



##########
File path: src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.range;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.reads.DataResolver;
+import org.apache.cassandra.service.reads.ReadCallback;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.AbstractIterator;
+
+class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeCommandIterator.class);
+
+    private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
+
+    private final Iterator<ReplicaPlan.ForRangeRead> ranges;
+    private final int totalRangeCount;
+    private final PartitionRangeReadCommand command;
+    private final boolean enforceStrictLiveness;
+
+    private final long startTime;
+    private final long queryStartNanoTime;
+    private DataLimits.Counter counter;
+    private PartitionIterator sentQueryIterator;
+
+    private final int maxConcurrencyFactor;
+    private int concurrencyFactor;
+    // The two following "metric" are maintained to improve the concurrencyFactor
+    // when it was not good enough initially.
+    private int liveReturned;
+    private int rangesQueried;
+    private int batchesRequested = 0;
+
+    RangeCommandIterator(Iterator<ReplicaPlan.ForRangeRead> ranges,
+                         PartitionRangeReadCommand command,
+                         int concurrencyFactor,
+                         int maxConcurrencyFactor,
+                         int totalRangeCount,
+                         long queryStartNanoTime)
+    {
+        this.ranges = ranges;
+        this.command = command;
+        this.concurrencyFactor = concurrencyFactor;
+        this.maxConcurrencyFactor = maxConcurrencyFactor;
+        this.totalRangeCount = totalRangeCount;
+        this.queryStartNanoTime = queryStartNanoTime;
+
+        startTime = System.nanoTime();
+        enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+    }
+
+    @Override
+    protected RowIterator computeNext()
+    {
+        try
+        {
+            while (sentQueryIterator == null || !sentQueryIterator.hasNext())
+            {
+                // If we don't have more range to handle, we're done
+                if (!ranges.hasNext())
+                    return endOfData();
+
+                // else, sends the next batch of concurrent queries (after having close the previous iterator)
+                if (sentQueryIterator != null)
+                {
+                    liveReturned += counter.counted();
+                    sentQueryIterator.close();
+
+                    // It's not the first batch of queries and we're not done, so we we can use what has been
+                    // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
+                    updateConcurrencyFactor();
+                }
+                sentQueryIterator = sendNextRequests();
+            }
+
+            return sentQueryIterator.next();
+        }
+        catch (UnavailableException e)
+        {
+            rangeMetrics.unavailables.mark();
+            throw e;
+        }
+        catch (ReadTimeoutException e)
+        {
+            rangeMetrics.timeouts.mark();
+            throw e;
+        }
+        catch (ReadFailureException e)
+        {
+            rangeMetrics.failures.mark();
+            throw e;
+        }
+    }
+
+    private void updateConcurrencyFactor()
+    {
+        liveReturned += counter.counted();
+
+        concurrencyFactor = computeConcurrencyFactor(totalRangeCount, rangesQueried, maxConcurrencyFactor, command.limits().count(), liveReturned);
+    }
+
+    @VisibleForTesting
+    static int computeConcurrencyFactor(int totalRangeCount, int rangesQueried, int maxConcurrencyFactor, int limit, int liveReturned)
+    {
+        maxConcurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, totalRangeCount - rangesQueried));
+        if (liveReturned == 0)
+        {
+            // we haven't actually gotten any results, so query up to the limit if not results so far
+            Tracing.trace("Didn't get any response rows; new concurrent requests: {}", maxConcurrencyFactor);
+            return maxConcurrencyFactor;
+        }
+
+        // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+        // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+        int remainingRows = limit - liveReturned;
+        float rowsPerRange = (float) liveReturned / (float) rangesQueried;
+        int concurrencyFactor = Math.max(1, Math.min(maxConcurrencyFactor, Math.round(remainingRows / rowsPerRange)));
+        logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+                     rowsPerRange, remainingRows, concurrencyFactor);
+        return concurrencyFactor;
+    }
+
+    /**
+     * Queries the provided sub-range.
+     *
+     * @param replicaPlan the subRange to query.
+     * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
+     * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
+     * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
+     * that it's the query that "continues" whatever we're previously queried).
+     */
+    private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
+    {
+        PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+        // If enabled, request repaired data tracking info from full replicas but
+        // only if there are multiple full replicas to compare results from
+        if (DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+            && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
+        {
+            command.trackRepairedStatus();
+            rangeCommand.trackRepairedStatus();
+        }
+
+        ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+        ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+        = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+        DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+        = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+        ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+        = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
+
+
+        if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isSelf())
+        {
+            Stage.READ.execute(new StorageProxy.LocalReadRunnable(rangeCommand, handler));
+        }
+        else
+        {
+            for (Replica replica : replicaPlan.contacts())
+            {
+                Tracing.trace("Enqueuing request to {}", replica);
+                ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
+                Message<ReadCommand> message = command.createMessage(command.isTrackingRepairedStatus() && replica.isFull());
+                MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
+            }
+        }
+
+        return new SingleRangeResponse(resolver, handler, readRepair);
+    }
+
+    private PartitionIterator sendNextRequests()
+    {
+        List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+        List<ReadRepair<?, ?>> readRepairs = new ArrayList<>(concurrencyFactor);
+
+        try
+        {
+            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); )
+            {
+                ReplicaPlan.ForRangeRead range = ranges.next();
+
+                @SuppressWarnings("resource") // response will be closed by concatAndBlockOnRepair, or in the catch block below

Review comment:
       I'm afraid this is needed to avoid a failure in `ant eclipse-warnings`, which is run by CI. Indeed, I was missing another similar `@SuppressWarnings` annotation [here](https://github.com/apache/cassandra/pull/801/commits/f0562873ee676dabca467f2a2651a3a74b7d725d) that was producing a CI failure due to `ant eclipse-warnings`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org