You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/02/15 11:55:31 UTC

[cassandra] branch trunk updated (2121767 -> 7cddbd4)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 2121767  Merge branch 'cassandra-3.11' into trunk
     add f258ae6  Fix Group By pager interaction with SRP
     new 7cddbd4  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/cassandra/db/filter/DataLimits.java | 28 ++++++++++----------
 .../reads/ShortReadPartitionsProtection.java       |  2 +-
 .../service/reads/ShortReadRowsProtection.java     |  2 +-
 .../cassandra/distributed/test/GroupByTest.java    | 30 ++++++++++++++++++++--
 4 files changed, 44 insertions(+), 18 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 7cddbd40ce6b326df533fd6d3c4131ef70b3b068
Merge: 2121767 f258ae6
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 15 12:50:38 2021 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../org/apache/cassandra/db/filter/DataLimits.java | 28 ++++++++++----------
 .../reads/ShortReadPartitionsProtection.java       |  2 +-
 .../service/reads/ShortReadRowsProtection.java     |  2 +-
 .../cassandra/distributed/test/GroupByTest.java    | 30 ++++++++++++++++++++--
 4 files changed, 44 insertions(+), 18 deletions(-)

diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 6c4dc68,0000000..51043c3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@@ -1,199 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.ExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
 +    private final ReadCommand command;
 +    private final Replica source;
 +
 +    private final Runnable preFetchCallback; // called immediately before fetching more contents
 +
 +    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 +
 +    private DecoratedKey lastPartitionKey; // key of the last observed partition
 +
 +    private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
 +
 +    private final long queryStartNanoTime;
 +
 +    public ShortReadPartitionsProtection(ReadCommand command,
 +                                         Replica source,
 +                                         Runnable preFetchCallback,
 +                                         DataLimits.Counter singleResultCounter,
 +                                         DataLimits.Counter mergedResultCounter,
 +                                         long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.preFetchCallback = preFetchCallback;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +    {
 +        partitionsFetched = true;
 +
 +        lastPartitionKey = partition.partitionKey();
 +
 +        /*
 +         * Extend for moreContents() then apply protection to track lastClustering by applyToRow().
 +         *
 +         * If we don't apply the transformation *after* extending the partition with MoreRows,
 +         * applyToRow() method of protection will not be called on the first row of the new extension iterator.
 +         */
 +        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
 +        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
 +        ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
 +                                                                         command, source,
 +                                                                         (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
 +                                                                         singleResultCounter,
 +                                                                         mergedResultCounter);
 +        return Transformation.apply(MoreRows.extend(partition, protection), protection);
 +    }
 +
 +    /*
 +     * We only get here once all the rows and partitions in this iterator have been iterated over, and so
 +     * if the node had returned the requested number of rows but we still get here, then some results were
 +     * skipped during reconciliation.
 +     */
 +    public UnfilteredPartitionIterator moreContents()
 +    {
 +        // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit
 +        assert !mergedResultCounter.isDone();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If this is a single partition read command or an (indexed) partition range read command with
 +         * a partition key specified, then we can't and shouldn't try fetch more partitions.
 +         */
 +        assert !command.isLimitedToOnePartition();
 +
 +        /*
 +         * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
 +         */
-         if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
++        if (command.limits().isExhausted(singleResultCounter) && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator.
 +         * There is no point to ask the replica for more rows - it has no more in the requested range.
 +         */
 +        if (!partitionsFetched)
 +            return null;
 +        partitionsFetched = false;
 +
 +        /*
 +         * We are going to fetch one partition at a time for thrift and potentially more for CQL.
 +         * The row limit will either be set to the per partition limit - if the command has no total row limit set, or
 +         * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions,
 +         * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones.
 +         */
 +        int toQuery = command.limits().count() != DataLimits.NO_LIMIT
 +                      ? command.limits().count() - mergedResultCounter.rowsCounted()
 +                      : command.limits().perPartitionCount();
 +
 +        ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +        logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source);
 +
 +        // If we've arrived here, all responses have been consumed, and we're about to request more.
 +        preFetchCallback.run();
 +
 +        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
 +    }
 +
 +    private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
 +    {
 +        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 +
 +        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
 +
 +        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
 +        AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
 +                                                      ? new Range<>(lastPartitionKey, bounds.right)
 +                                                      : new ExcludingBounds<>(lastPartitionKey, bounds.right);
 +        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 +
 +        ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1);
 +        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
 +    }
 +
 +    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 +    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
 +    {
 +        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
 +        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
index 9ba074d,0000000..8061f0a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@@ -1,189 -1,0 +1,189 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.function.Function;
 +
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
 +{
 +    private final ReadCommand command;
 +    private final Replica source;
 +    private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 +    private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor;
 +    private final TableMetadata metadata;
 +    private final DecoratedKey partitionKey;
 +
 +    private Clustering<?> lastClustering; // clustering of the last observed row
 +
 +    private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
 +    private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
 +    private int lastQueried = 0; // # extra rows requested from the replica last time
 +
 +    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source,
 +                            Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor,
 +                            DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.commandExecutor = commandExecutor;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.metadata = command.metadata();
 +        this.partitionKey = partitionKey;
 +    }
 +
 +    @Override
 +    public Row applyToRow(Row row)
 +    {
 +        lastClustering = row.clustering();
 +        return row;
 +    }
 +
 +    /*
 +     * We only get here once all the rows in this iterator have been iterated over, and so if the node
 +     * had returned the requested number of rows but we still get here, then some results were skipped
 +     * during reconciliation.
 +     */
 +    public UnfilteredRowIterator moreContents()
 +    {
 +        // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
 +        assert !mergedResultCounter.isDoneForPartition();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
 +         */
-         if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
++        if (command.limits().isExhausted(singleResultCounter) && command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * If the replica has no live rows in the partition, don't try to fetch more.
 +         *
 +         * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
 +         * always cover this scenario:
 +         * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
 +         * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
 +         *
 +         * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
 +         * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
 +         * have tombstones in the current partition.
 +         *
 +         * One other way we can hit this condition is when the partition only has a live static row and no regular
 +         * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
 +         * the moreContents() call.
 +         */
 +        if (singleResultCounter.rowsCountedInCurrentPartition() == 0)
 +            return null;
 +
 +        /*
 +         * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
 +         * We already have the row, so there is no point in asking for more from the partition.
 +         */
 +        if (lastClustering != null && lastClustering.isEmpty())
 +            return null;
 +
 +        lastFetched = singleResultCounter.rowsCountedInCurrentPartition() - lastCounted;
 +        lastCounted = singleResultCounter.rowsCountedInCurrentPartition();
 +
 +        // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
 +        if (lastQueried > 0 && lastFetched < lastQueried)
 +            return null;
 +
 +        /*
 +         * At this point we know that:
 +         *     1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
 +         *        rows in the partition
 +         *     2. at least one of those returned rows was shadowed by a tombstone returned from another
 +         *        replica
 +         *     3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
 +         *        avoid a short read
 +         *
 +         * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
 +         * are defined as follows:
 +         *     [a] limits.count() - mergedResultCounter.counted()
 +         *     [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
 +         *
 +         * It would be naive to query for exactly that many rows, as it's possible and not unlikely
 +         * that some of the returned rows would also be shadowed by tombstones from other hosts.
 +         *
 +         * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
 +         * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
 +         *
 +         * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
 +         * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
 +         * marginal cost for each extra row requested.
 +         *
 +         * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
 +         * time we want to respect paging limits and not blow up spectacularly.
 +         *
 +         * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
 +         * counts.
 +         *
 +         * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits.
 +         *
 +         * See CASSANDRA-13794 for more details.
 +         */
 +        lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
 +
 +        ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
 +
 +        SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
 +        return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
 +    }
 +
 +    private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
 +    {
 +        ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
 +        if (null != lastClustering)
 +            filter = filter.forPaging(metadata.comparator, lastClustering, false);
 +
 +        return SinglePartitionReadCommand.create(command.metadata(),
 +                                                 command.nowInSec(),
 +                                                 command.columnFilter(),
 +                                                 command.rowFilter(),
 +                                                 command.limits().forShortReadRetry(toQuery),
 +                                                 partitionKey,
 +                                                 filter,
 +                                                 command.indexMetadata());
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org