You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/08/06 12:27:38 UTC

[cassandra] branch trunk updated (528e3ad -> ae51f0f)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 528e3ad  jvm-dtests crash on java 11
     new 86a9261  Fix short read protection for GROUP BY queries
     new ae51f0f  Merge branch 'cassandra-3.11' into trunk

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/filter/DataLimits.java |  66 ++++++------
 .../reads/ShortReadPartitionsProtection.java       |  10 +-
 .../service/reads/ShortReadRowsProtection.java     |  14 +--
 .../distributed/test/ShortReadProtectionTest.java  | 116 +++++++++++++++++++++
 5 files changed, 154 insertions(+), 53 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ae51f0fd12820707927803fdbe63581f33111d4b
Merge: 528e3ad 86a9261
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Thu Aug 6 13:26:39 2020 +0100

    Merge branch 'cassandra-3.11' into trunk
    
    # Conflicts:
    #	CHANGES.txt
    #	src/java/org/apache/cassandra/db/filter/DataLimits.java
    #	src/java/org/apache/cassandra/service/DataResolver.java

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/filter/DataLimits.java |  66 ++++++------
 .../reads/ShortReadPartitionsProtection.java       |  10 +-
 .../service/reads/ShortReadRowsProtection.java     |  14 +--
 .../distributed/test/ShortReadProtectionTest.java  | 116 +++++++++++++++++++++
 5 files changed, 154 insertions(+), 53 deletions(-)

diff --cc CHANGES.txt
index dcefef0,5168acb..7b4cc98
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
 -3.11.8
 +4.0-beta2
 + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 +Merged from 3.11:
+  * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
  Merged from 3.0:
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3a766e0,2759932..e6495e8
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -161,7 -191,7 +161,7 @@@ public abstract class DataLimit
       * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted
       * as 1 valid row.
       * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
--     *                              normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
++     * normally retrieved from {@link org.apache.cassandra.schema.TableMetadata#enforceStrictLiveness()}
       * @return a new {@code Counter} for this limits.
       */
      public abstract Counter newCounter(int nowInSec,
diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 42676b6,0000000..6c4dc68
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@@ -1,207 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.ExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
 +    private final ReadCommand command;
 +    private final Replica source;
 +
 +    private final Runnable preFetchCallback; // called immediately before fetching more contents
 +
 +    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 +
 +    private DecoratedKey lastPartitionKey; // key of the last observed partition
 +
 +    private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
 +
 +    private final long queryStartNanoTime;
 +
 +    public ShortReadPartitionsProtection(ReadCommand command,
 +                                         Replica source,
 +                                         Runnable preFetchCallback,
 +                                         DataLimits.Counter singleResultCounter,
 +                                         DataLimits.Counter mergedResultCounter,
 +                                         long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.preFetchCallback = preFetchCallback;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +    {
 +        partitionsFetched = true;
 +
 +        lastPartitionKey = partition.partitionKey();
 +
 +        /*
 +         * Extend for moreContents() then apply protection to track lastClustering by applyToRow().
 +         *
 +         * If we don't apply the transformation *after* extending the partition with MoreRows,
 +         * applyToRow() method of protection will not be called on the first row of the new extension iterator.
 +         */
 +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
 +        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
 +        ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
 +                                                                         command, source,
 +                                                                         (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
 +                                                                         singleResultCounter,
 +                                                                         mergedResultCounter);
 +        return Transformation.apply(MoreRows.extend(partition, protection), protection);
 +    }
 +
 +    /*
 +     * We only get here once all the rows and partitions in this iterator have been iterated over, and so
 +     * if the node had returned the requested number of rows but we still get here, then some results were
 +     * skipped during reconciliation.
 +     */
 +    public UnfilteredPartitionIterator moreContents()
 +    {
 +        // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
 +        assert !mergedResultCounter.isDone();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If this is a single partition read command or an (indexed) partition range read command with
 +         * a partition key specified, then we can't and shouldn't try fetch more partitions.
 +         */
 +        assert !command.isLimitedToOnePartition();
 +
 +        /*
 +         * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
 +         * There is no point to ask the replica for more rows - it has no more in the requested range.
 +         */
 +        if (!partitionsFetched)
 +            return null;
 +        partitionsFetched = false;
 +
 +        /*
 +         * We are going to fetch one partition at a time for thrift and potentially more for CQL.
 +         * The row limit will either be set to the per partition limit - if the command has no total row limit set, or
 +         * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
 +         * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
 +         */
 +        int toQuery = command.limits().count() != DataLimits.NO_LIMIT
-                       ? command.limits().count() - counted(mergedResultCounter)
++                      ? command.limits().count() - mergedResultCounter.rowsCounted()
 +                      : command.limits().perPartitionCount();
 +
 +        ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +        logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +
 +        // If we've arrived here, all responses have been consumed, and we're about to request more.
 +        preFetchCallback.run();
 +
 +        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
 +    }
 +
-     // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
-     private int counted(DataLimits.Counter counter)
-     {
-         return command.limits().isGroupByLimit()
-                ? counter.rowCounted()
-                : counter.counted();
-     }
- 
 +    private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
 +    {
 +        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 +
 +        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
 +
 +        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
 +        AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
 +                                                      ? new Range<>(lastPartitionKey, bounds.right)
 +                                                      : new ExcludingBounds<>(lastPartitionKey, bounds.right);
 +        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 +
 +        ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1);
 +        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
 +    }
 +
 +    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 +    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
 +    {
 +        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
 +        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
index 8dc7fc7,0000000..f32502d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@@ -1,197 -1,0 +1,189 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.function.Function;
 +
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
 +{
 +    private final ReadCommand command;
 +    private final Replica source;
 +    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 +    private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
 +    private final TableMetadata metadata;
 +    private final DecoratedKey partitionKey;
 +
 +    private Clustering lastClustering; // clustering of the last observed row
 +
 +    private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
 +    private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
 +    private int lastQueried = 0; // # extra rows requested from the replica last time
 +
 +    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source,
 +                            Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
 +                            DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.commandExecutor = commandExecutor;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.metadata = command.metadata();
 +        this.partitionKey = partitionKey;
 +    }
 +
 +    @Override
 +    public Row applyToRow(Row row)
 +    {
 +        lastClustering = row.clustering();
 +        return row;
 +    }
 +
 +    /*
 +     * We only get here once all the rows in this iterator have been iterated over, and so if the node
 +     * had returned the requested number of rows but we still get here, then some results were skipped
 +     * during reconciliation.
 +     */
 +    public UnfilteredRowIterator moreContents()
 +    {
 +        // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
 +        assert !mergedResultCounter.isDoneForPartition();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * If the replica has no live rows in the partition, don't try to fetch more.
 +         *
 +         * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
 +         * always cover this scenario:
 +         * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
 +         * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
 +         *
 +         * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
 +         * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
 +         * have tombstones in the current partition.
 +         *
 +         * One other way we can hit this condition is when the partition only has a live static row and no regular
 +         * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
 +         * the moreContents() call.
 +         */
-         if (countedInCurrentPartition(singleResultCounter) == 0)
++        if (singleResultCounter.rowsCountedInCurrentPartition() == 0)
 +            return null;
 +
 +        /*
 +         * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
 +         * We already have the row, so there is no point in asking for more from the partition.
 +         */
 +        if (Clustering.EMPTY == lastClustering)
 +            return null;
 +
-         lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
-         lastCounted = countedInCurrentPartition(singleResultCounter);
++        lastFetched = singleResultCounter.rowsCountedInCurrentPartition() - lastCounted;
++        lastCounted = singleResultCounter.rowsCountedInCurrentPartition();
 +
 +        // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
 +        if (lastQueried > 0 && lastFetched < lastQueried)
 +            return null;
 +
 +        /*
 +         * At this point we know that:
 +         *     1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
 +         *        rows in the partition
 +         *     2. at least one of those returned rows was shadowed by a tombstone returned from another
 +         *        replica
 +         *     3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
 +         *        avoid a short read
 +         *
 +         * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
 +         * are defined as follows:
 +         *     [a] limits.count() - mergedResultCounter.counted()
 +         *     [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
 +         *
 +         * It would be naive to query for exactly that many rows, as it's possible and not unlikely
 +         * that some of the returned rows would also be shadowed by tombstones from other hosts.
 +         *
 +         * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
 +         * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
 +         *
 +         * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
 +         * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
 +         * marginal cost for each extra row requested.
 +         *
 +         * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
 +         * time we want to respect paging limits and not blow up spectacularly.
 +         *
 +         * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
 +         * counts.
 +         *
 +         * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
 +         *
 +         * See CASSANDRA-13794 for more details.
 +         */
 +        lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
 +
 +        ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
 +
 +        SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
 +        return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
 +    }
 +
-     // Counts the number of rows for regular queries and the number of groups for GROUP BY queries
-     private int countedInCurrentPartition(DataLimits.Counter counter)
-     {
-         return command.limits().isGroupByLimit()
-                ? counter.rowCountedInCurrentPartition()
-                : counter.countedInCurrentPartition();
-     }
- 
 +    private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
 +    {
 +        ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
 +        if (null != lastClustering)
 +            filter = filter.forPaging(metadata.comparator, lastClustering, false);
 +
 +        return SinglePartitionReadCommand.create(command.metadata(),
 +                                                 command.nowInSec(),
 +                                                 command.columnFilter(),
 +                                                 command.rowFilter(),
 +                                                 command.limits().forShortReadRetry(toQuery),
 +                                                 partitionKey,
 +                                                 filter,
 +                                                 command.indexMetadata());
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 0000000,27390d6..23b5c14
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@@ -1,0 -1,158 +1,116 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.List;
 -import java.util.concurrent.Callable;
 -import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.junit.Test;
+ 
 -import net.bytebuddy.ByteBuddy;
 -import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 -import net.bytebuddy.implementation.MethodDelegation;
 -import net.bytebuddy.implementation.bind.annotation.SuperCall;
 -import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
 -import org.apache.cassandra.transport.messages.ResultMessage;
+ 
+ import static java.lang.String.format;
+ import static java.util.Arrays.asList;
 -import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ 
+ /**
+  * Tests short read protection, the mechanism that ensures distributed queries at read consistency levels > ONE/LOCAL_ONE
+  * avoid short reads that might happen when a limit is used and reconciliation accepts less rows than such limit.
+  */
+ public class ShortReadProtectionTest extends TestBaseImpl
+ {
+     /**
+      * Test GROUP BY with short read protection, particularly when there is a limit and regular row deletions.
+      * <p>
+      * See CASSANDRA-15459
+      */
+     @Test
+     public void testGroupBySRPRegularRow() throws Throwable
+     {
+         testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))",
+                        asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0",
+                               "DELETE FROM %s WHERE pk=0 AND ck=0",
+                               "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0"),
+                        asList("DELETE FROM %s WHERE pk=1 AND ck=1",
+                               "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0",
+                               "DELETE FROM %s WHERE pk=2 AND ck=2"),
+                        asList("SELECT * FROM %s LIMIT 1",
+                               "SELECT * FROM %s LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+     }
+ 
+     /**
+      * Test GROUP BY with short read protection, particularly when there is a limit and static row deletions.
+      * <p>
+      * See CASSANDRA-15459
+      */
+     @Test
+     public void testGroupBySRPStaticRow() throws Throwable
+     {
+         testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))",
+                        asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0",
+                               "INSERT INTO %s (pk, s) VALUES (0, null)",
+                               "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0"),
+                        asList("INSERT INTO %s (pk, s) VALUES (1, null)",
+                               "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0",
+                               "INSERT INTO %s (pk, s) VALUES (2, null)"),
+                        asList("SELECT * FROM %s LIMIT 1",
+                               "SELECT * FROM %s LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+     }
+ 
+     private void testGroupBySRP(String createTable,
+                                 List<String> node1Queries,
+                                 List<String> node2Queries,
+                                 List<String> coordinatorQueries) throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.build()
+                                            .withNodes(2)
+                                            .withConfig(config -> config.set("hinted_handoff_enabled", false))
 -                                           .withInstanceInitializer(BBDropMutationsHelper::install)
+                                            .start()))
+         {
++            // create the table with read repair disabled
+             String table = withKeyspace("%s.t");
 -            cluster.schemaChange(format(createTable, table));
++            cluster.schemaChange(format(createTable + " WITH read_repair='NONE'", table));
+ 
+             // populate data on node1
+             IInvokableInstance node1 = cluster.get(1);
+             for (String query : node1Queries)
+                 node1.executeInternal(format(query, table));
+ 
+             // populate data on node2
+             IInvokableInstance node2 = cluster.get(2);
+             for (String query : node2Queries)
+                 node2.executeInternal(format(query, table));
+ 
 -            // ignore read repair writes
 -            node1.runOnInstance(BBDropMutationsHelper::enable);
 -            node2.runOnInstance(BBDropMutationsHelper::enable);
 -
+             // verify the behaviour of SRP with GROUP BY queries
+             ICoordinator coordinator = cluster.coordinator(1);
+             for (String query : coordinatorQueries)
+                 assertRows(coordinator.execute(format(query, table), ALL));
+         }
+     }
 -
 -    /**
 -     * Byte Buddy helper to silently drop mutations.
 -     */
 -    public static class BBDropMutationsHelper
 -    {
 -        private static final AtomicBoolean enabled = new AtomicBoolean(false);
 -
 -        static void enable()
 -        {
 -            enabled.set(true);
 -        }
 -
 -        static void install(ClassLoader cl, int nodeNumber)
 -        {
 -            new ByteBuddy().rebase(Mutation.class)
 -                           .method(named("apply"))
 -                           .intercept(MethodDelegation.to(BBDropMutationsHelper.class))
 -                           .make()
 -                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
 -        }
 -
 -        public static void execute(@SuperCall Callable<ResultMessage.Rows> r) throws Exception
 -        {
 -            if (enabled.get())
 -                return;
 -            r.call();
 -        }
 -    }
 -}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org