You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/10/24 09:25:34 UTC
[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9cba621f50536289380994593ca1e56c9337a4d8
Merge: 93815db 9081d00
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Oct 24 10:09:42 2019 +0100
Merge branch 'cassandra-2.2' into cassandra-3.0
.../cassandra/db/SinglePartitionReadCommand.java | 4 +-
.../db/partitions/AbstractBTreePartition.java | 9 +++-
.../cassandra/distributed/api/IInstance.java | 3 ++
.../impl/DelegatingInvokableInstance.java | 10 ++++
.../cassandra/distributed/impl/Instance.java | 19 +++++++
.../upgrade/MixedModeReadRepairTest.java | 59 ++++++++++++++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 17 ++++++-
7 files changed, 117 insertions(+), 4 deletions(-)
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4c8e0bc,0000000..15b74d8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -1,1299 -1,0 +1,1299 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.RTBoundValidator;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+
+/**
+ * A read command that selects a (part of a) single partition.
+ */
+public class SinglePartitionReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DecoratedKey partitionKey;
+ private final ClusteringIndexFilter clusteringIndexFilter;
+
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+ private SinglePartitionReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata index)
+ {
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ assert partitionKey.getPartitioner() == metadata.partitioner;
+ this.partitionKey = partitionKey;
+ this.clusteringIndexFilter = clusteringIndexFilter;
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ * @param indexMetadata explicitly specified index to use for the query
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter,
+ IndexMetadata indexMetadata)
+ {
+ return new SinglePartitionReadCommand(false,
+ 0,
+ isForThrift,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ indexMetadata);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param isForThrift whether the query is for thrift or not.
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param columnFilter the column filter to use for the query.
+ * @param rowFilter the row filter to use for the query.
+ * @param limits the limits to use for the query.
+ * @param partitionKey the partition key for the partition to query.
+ * @param clusteringIndexFilter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command.
+ */
+ public static SinglePartitionReadCommand create(boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexFilter clusteringIndexFilter)
+ {
+ return create(isForThrift,
+ metadata,
+ nowInSec,
+ columnFilter,
+ rowFilter,
+ limits,
+ partitionKey,
+ clusteringIndexFilter,
+ findIndex(metadata, rowFilter));
+ }
+
+ /**
+ * Creates a new read command on a single partition.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param columnFilter the column filter to use for the query.
+ * @param filter the clustering index filter to use for the query.
+ *
+ * @return a newly created read command. The returned command will use no row filter and have no limits.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata,
+ int nowInSec,
+ DecoratedKey key,
+ ColumnFilter columnFilter,
+ ClusteringIndexFilter filter)
+ {
+ return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
+ {
+ return create(metadata, nowInSec, key, Slices.ALL);
+ }
+
+ /**
+ * Creates a new read command that queries a single partition in its entirety.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ *
+ * @return a newly created read command that queries all the rows of {@code key}.
+ */
+ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided single slice.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slice the slice of rows to query.
+ *
+ * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+ {
+ return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+ {
+ ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+ return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+ {
+ return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+ }
+
+ /**
+ * Creates a new single partition name command for the provided rows.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param names the clustering for the rows to query.
+ *
+ * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will
+ * query every columns (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
+ {
+ ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
+ return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ /**
+ * Creates a new single partition name command for the provided row.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param name the clustering for the row to query.
+ *
+ * @return a newly created read command that queries {@code name} in {@code key}. The returned query will
+ * query every columns (without limit or row filtering).
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Clustering name)
+ {
+ return create(metadata, nowInSec, key, FBUtilities.singleton(name, metadata.comparator));
+ }
+
+ public SinglePartitionReadCommand copy()
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata());
+ }
+
+ public SinglePartitionReadCommand copyAsDigestQuery()
+ {
+ return new SinglePartitionReadCommand(true,
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata());
+ }
+
+ public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter)
+ {
+ return new SinglePartitionReadCommand(isDigestQuery(),
+ digestVersion(),
+ isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ filter,
+ indexMetadata());
+ }
+
+ static SinglePartitionReadCommand legacySliceCommand(boolean isDigest,
+ int digestVersion,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexSliceFilter filter)
+ {
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionReadCommand(isDigest,
+ digestVersion,
+ true,
+ metadata,
+ nowInSec,
+ columnFilter,
+ RowFilter.NONE,
+ limits,
+ partitionKey,
+ filter,
+ null);
+ }
+
+ static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest,
+ int digestVersion,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ DecoratedKey partitionKey,
+ ClusteringIndexNamesFilter filter)
+ {
+ // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+ return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null);
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter()
+ {
+ return clusteringIndexFilter;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return clusteringIndexFilter;
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getReadRpcTimeout();
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ if (!this.partitionKey().equals(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!clusteringIndexFilter().selects(clustering))
+ return false;
+
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ /**
+ * Returns a new command suitable to paging from the last returned row.
+ *
+ * @param lastReturned the last row returned by the previous page. The newly created command
+ * will only query row that comes after this (in query order). This can be {@code null} if this
+ * is the first page.
+ * @param pageSize the size to use for the page to query.
+ *
+ * @return the newly create command.
+ */
+ public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize)
+ {
+ // We shouldn't have set digest yet when reaching that point
+ assert !isDigestQuery();
+ return create(isForThrift(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits().forPaging(pageSize),
+ partitionKey(),
+ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(Group.one(this), consistency, clientState);
+ }
+
+ public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ return getPager(this, pagingState, protocolVersion);
+ }
+
+ private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
+ {
+ return new SinglePartitionPager(command, pagingState, protocolVersion);
+ }
+
+ protected void recordLatency(TableMetrics metric, long latencyNanos)
+ {
+ metric.readLatency.addNano(latencyNanos);
+ }
+
+ @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
+ : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
+ return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
+ }
+
+ /**
+ * Fetch the rows requested if in cache; if not, read it from disk and cache it.
+ * <p>
+ * If the partition is cached, and the filter given is within its bounds, we return
+ * from cache, otherwise from disk.
+ * <p>
+ * If the partition is is not cached, we figure out what filter is "biggest", read
+ * that from disk, then filter the result and either cache that or return it.
+ */
+ private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ assert !cfs.isIndex(); // CASSANDRA-5732
+ assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
+
+ RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey());
+
+ // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
+ // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
+ // TODO: don't evict entire partitions on writes (#2864)
+ IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+ if (cached != null)
+ {
+ if (cached instanceof RowCacheSentinel)
+ {
+ // Some other read is trying to cache the value, just do a normal non-caching read
+ Tracing.trace("Row cache miss (race)");
+ cfs.metric.rowCacheMiss.inc();
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ CachedPartition cachedPartition = (CachedPartition)cached;
+ if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
+ {
+ cfs.metric.rowCacheHit.inc();
+ Tracing.trace("Row cache hit");
+ UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
+ cfs.metric.updateSSTableIterated(0);
+ return unfilteredRowIterator;
+ }
+
+ cfs.metric.rowCacheHitOutOfRange.inc();
+ Tracing.trace("Ignoring row cache as cached value could not satisfy query");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ cfs.metric.rowCacheMiss.inc();
+ Tracing.trace("Row cache miss");
+
+ // Note that on tables with no clustering keys, any positive value of
+ // rowsToCache implies caching the full partition
+ boolean cacheFullPartitions = metadata().clusteringColumns().size() > 0 ?
+ metadata().params.caching.cacheAllRows() :
+ metadata().params.caching.cacheRows();
+
+ // To be able to cache what we read, what we read must at least covers what the cache holds, that
+ // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows
+ // systematically, but we'd have to "extend" that to whatever is needed for the user query that the
+ // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently
+ // we settle for caching what we read only if the user query does query the head of the partition since
+ // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache
+ // full partitions, in which case we just always read it all and cache.
+ if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
+ {
+ RowCacheSentinel sentinel = new RowCacheSentinel();
+ boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
+ boolean sentinelReplaced = false;
+
+ try
+ {
+ final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+ final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
+
+ @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
+ UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+ try
+ {
+ // Use a custom iterator instead of DataLimits to avoid stopping the original iterator
+ UnfilteredRowIterator toCacheIterator = new WrappingUnfilteredRowIterator(iter)
+ {
+ private int rowsCounted = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return rowsCounted < rowsToCache && super.hasNext();
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ Unfiltered unfiltered = super.next();
+ if (unfiltered.isRow())
+ {
+ Row row = (Row) unfiltered;
+ if (row.hasLiveData(nowInSec(), enforceStrictLiveness))
+ rowsCounted++;
+ }
+ return unfiltered;
+ }
+ };
+
+ // We want to cache only rowsToCache rows
+ CachedPartition toCache = CachedBTreePartition.create(toCacheIterator, nowInSec());
+ if (sentinelSuccess && !toCache.isEmpty())
+ {
+ Tracing.trace("Caching {} rows", toCache.rowCount());
+ CacheService.instance.rowCache.replace(key, sentinel, toCache);
+ // Whether or not the previous replace has worked, our sentinel is not in the cache anymore
+ sentinelReplaced = true;
+ }
+
+ // We then re-filter out what this query wants.
+ // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more
+ // than what we've cached, so we can't just use toCache.
+ UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
+ if (cacheFullPartitions)
+ {
+ // Everything is guaranteed to be in 'toCache', we're done with 'iter'
+ assert !iter.hasNext();
+ iter.close();
+ return cacheIterator;
+ }
+ return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
+ }
+ catch (RuntimeException | Error e)
+ {
+ iter.close();
+ throw e;
+ }
+ }
+ finally
+ {
+ if (sentinelSuccess && !sentinelReplaced)
+ cfs.invalidateCachedPartition(key);
+ }
+ }
+
+ Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
+ return queryMemtableAndDisk(cfs, readOp);
+ }
+
+ /**
+ * Queries both memtable and sstables to fetch the result of this query.
+ * <p>
+ * Please note that this method:
+ * 1) does not check the row cache.
+ * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes).
+ * Those are applied in {@link ReadCommand#executeLocally}.
+ * 3) does not record some of the read metrics (latency, scanned cells histograms) nor
+ * throws TombstoneOverwhelmingException.
+ * It is publicly exposed because there is a few places where that is exactly what we want,
+ * but it should be used only where you know you don't need thoses things.
+ * <p>
+ * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is
+ * to enforce that that it is required as parameter, even though it's not explicitlly used by the method.
+ */
+ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
+ {
+ Tracing.trace("Executing single-partition query on {}", cfs.name);
+
+ boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
+ return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+ }
+
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
+ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ {
+ /*
+ * We have 2 main strategies:
+ * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
+ * unless we have a names filter that we know we can optimize futher.
+ * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
+ * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
+ * have a way to guarantee we have all the data for what is queried, which is only possible for name queries
+ * and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
+ * we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
+ * and counters are intrinsically a collection of shards and so have the same problem).
+ */
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
+ return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ ClusteringIndexFilter filter = clusteringIndexFilter();
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
+ UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+
+ if (copyOnHeap)
+ iter = UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance);
+
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+
+ if (isForThrift())
+ iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+ iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
+ }
+ /*
+ * We can't eliminate full sstables based on the timestamp of what we've already read like
+ * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+ * we've read. We still rely on the sstable ordering by maxTimestamp since if
+ * maxTimestamp_s1 > maxTimestamp_s0,
+ * we're guaranteed that s1 cannot have a row tombstone such that
+ * timestamp(tombstone) > maxTimestamp_s0
+ * since we necessarily have
+ * timestamp(tombstone) <= maxTimestamp_s1
+ * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+ * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ List<SSTableReader> skippedSSTables = null;
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ int nonIntersectingSSTables = 0;
+ SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we can skip it
+ if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.hasTombstones())
+ {
+ if (skippedSSTables == null)
+ skippedSSTables = new ArrayList<>();
+ skippedSSTables.add(sstable);
+ }
+ continue;
+ }
+
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+ // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
+ UnfilteredRowIterator iter = filter.filter(
+ sstable.iterator(partitionKey(),
+ columnFilter(),
+ filter.isReversed(),
+ isForThrift(),
+ metricsCollector)
+ );
+
+ if (isForThrift())
+ iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+ iterators.add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false));
+
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+ }
+
+ int includedDueToTombstones = 0;
+ // Check for partition tombstones in the skipped sstables
+ if (skippedSSTables != null)
+ {
+ for (SSTableReader sstable : skippedSSTables)
+ {
+ if (sstable.getMaxTimestamp() <= minTimestamp)
+ continue;
+
+ @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(),
+ columnFilter(),
+ filter.isReversed(),
+ isForThrift(),
+ metricsCollector));
+
+ if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+ {
+ iterators.add(iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ includedDueToTombstones++;
+ }
+ else
+ {
+ iter.close();
+ }
+ }
+ }
+ if (Tracing.isTracing())
+ Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+ cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables());
+
+ if (iterators.isEmpty())
+ return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
+
+ Tracing.trace("Merging data from memtables and {} sstables", metricsCollector.getMergedSSTables());
+
+ @SuppressWarnings("resource") // Closed through the closing of the result of that method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ return merged;
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ private boolean shouldInclude(SSTableReader sstable)
+ {
+ // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable
+ // don't tell us if the sstable contains static values in particular.
+ // TODO: we could record if a sstable contains any static value at all.
+ if (!columnFilter().fetchedColumns().statics.isEmpty())
+ return true;
+
+ return clusteringIndexFilter().shouldInclude(sstable);
+ }
+
+ private boolean queriesMulticellType()
+ {
+ for (ColumnDefinition column : columnFilter().fetchedColumns())
+ {
+ if (column.type.isMultiCell() || column.type.isCounter())
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
+ * max timestamp.
+ *
+ * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
+ * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
+ * no collection or counters are included).
+ * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
+ */
+ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+ {
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+ ImmutableBTreePartition result = null;
+
+ Tracing.trace("Merging memtable contents");
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
+ {
+ if (iter.isEmpty())
+ continue;
+
- UnfilteredRowIterator clonedFilter = copyOnHeap
++ UnfilteredRowIterator clonedIter = copyOnHeap
+ ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+ : iter;
+ result = add(
- RTBoundValidator.validate(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, RTBoundValidator.Stage.MEMTABLE, false),
++ RTBoundValidator.validate(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedIter, nowInSec()) : clonedIter, RTBoundValidator.Stage.MEMTABLE, false),
+ result,
+ filter,
+ false
+ );
+ }
+ }
+
+ /* add the SSTables on disk */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ boolean onlyUnrepaired = true;
+ // read sorted sstables
+ SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
+ for (SSTableReader sstable : view.sstables)
+ {
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we're done, since the rest of the sstables
+ // will also be older
+ if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
+ break;
+
+ long currentMaxTs = sstable.getMaxTimestamp();
+ filter = reduceFilter(filter, result, currentMaxTs);
+ if (filter == null)
+ break;
+
+ if (!shouldInclude(sstable))
+ {
+ // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion
+ // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us
+ // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable
+ // has any tombstone at all as a shortcut.
+ if (!sstable.hasTombstones())
+ continue; // Means no tombstone at all, we can skip that sstable
+
+ // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable.
+ try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(),
+ columnFilter(),
+ filter.isReversed(),
+ isForThrift(),
+ metricsCollector)))
+ {
+ if (!iter.partitionLevelDeletion().isLive())
+ {
+ result = add(
+ UnfilteredRowIterators.noRowsIterator(iter.metadata(),
+ iter.partitionKey(),
+ Rows.EMPTY_STATIC_ROW,
+ iter.partitionLevelDeletion(),
+ filter.isReversed()),
+ result,
+ filter,
+ sstable.isRepaired()
+ );
+ }
+ else
+ {
+ result = add(
+ RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
+ result,
+ filter,
+ sstable.isRepaired()
+ );
+ }
+ }
+
+ continue;
+ }
+
+ Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
+ try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(),
+ columnFilter(),
+ filter.isReversed(),
+ isForThrift(),
+ metricsCollector)))
+ {
+ if (iter.isEmpty())
+ continue;
+
+ if (sstable.isRepaired())
+ onlyUnrepaired = false;
+
+ result = add(
+ RTBoundValidator.validate(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, RTBoundValidator.Stage.SSTABLE, false),
+ result,
+ filter,
+ sstable.isRepaired()
+ );
+ }
+ }
+
+ cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables());
+
+ if (result == null || result.isEmpty())
+ return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
+
+ DecoratedKey key = result.partitionKey();
+ cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+
+ // "hoist up" the requested data into a more recent sstable
+ if (metricsCollector.getMergedSSTables() > cfs.getMinimumCompactionThreshold()
+ && onlyUnrepaired
+ && !cfs.isAutoCompactionDisabled()
+ && cfs.getCompactionStrategyManager().shouldDefragment())
+ {
+ // !!WARNING!! if we stop copying our data to a heap-managed object,
+ // we will need to track the lifetime of this mutation as well
+ Tracing.trace("Defragmenting requested data");
+
+ try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+ {
+ final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+ StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+ {
+ public void run()
+ {
+ // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+ }
+ });
+ }
+ }
+
+ return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+ }
+
+ private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+ {
+ if (!isRepaired)
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+
+ int maxRows = Math.max(filter.requestedRows().size(), 1);
+ if (result == null)
+ return ImmutableBTreePartition.create(iter, maxRows);
+
+ try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
+ {
+ return ImmutableBTreePartition.create(merged, maxRows);
+ }
+ }
+
+ private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
+ {
+ if (result == null)
+ return filter;
+
+ SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
+
+ PartitionColumns columns = columnFilter().fetchedColumns();
+ NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+ // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
+ // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
+ // that for later.
+
+ boolean removeStatic = false;
+ if (!columns.statics.isEmpty())
+ {
+ Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+ removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
+ }
+
+ NavigableSet<Clustering> toRemove = null;
+ for (Clustering clustering : clusterings)
+ {
+ Row row = searchIter.next(clustering);
+ if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
+ continue;
+
+ if (toRemove == null)
+ toRemove = new TreeSet<>(result.metadata().comparator);
+ toRemove.add(clustering);
+ }
+
+ if (!removeStatic && toRemove == null)
+ return filter;
+
+ // Check if we have everything we need
+ boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+ boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
+ if (hasNoMoreStatic && hasNoMoreClusterings)
+ return null;
+
+ if (toRemove != null)
+ {
+ BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
+ newClusterings.addAll(Sets.difference(clusterings, toRemove));
+ clusterings = newClusterings.build();
+ }
+ return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+ }
+
+ private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
+ {
+ // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
+ // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
+ // and 2) the requested columns.
+ if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+ return false;
+
+ for (ColumnDefinition column : requestedColumns)
+ {
+ Cell cell = row.getCell(column);
+ if (cell == null || cell.timestamp() <= sstableTimestamp)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean selectsFullPartition()
+ {
+ return metadata().isStaticCompactTable() ||
+ (clusteringIndexFilter.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns());
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ metadata().getKeyValidator().getString(partitionKey().getKey()),
+ clusteringIndexFilter.toString(metadata()),
+ nowInSec());
+ }
+
+ public MessageOut<ReadCommand> createMessage(int version)
+ {
+ return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer);
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ sb.append(" WHERE ");
+
+ sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = ");
+ DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey());
+
+ // We put the row filter first because the clustering index filter can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ sb.append(" AND ").append(rowFilter());
+
+ String filterString = clusteringIndexFilter().toCQLString(metadata());
+ if (!filterString.isEmpty())
+ sb.append(" AND ").append(filterString);
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+ ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version);
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return metadata().getKeyValidator().writtenLength(partitionKey().getKey())
+ + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
+ }
+
+ public boolean isLimitedToOnePartition()
+ {
+ return true;
+ }
+
+ /**
+ * Groups multiple single partition read commands.
+ */
+ public static class Group implements ReadQuery
+ {
+ public final List<SinglePartitionReadCommand> commands;
+ private final DataLimits limits;
+ private final int nowInSec;
+ private final boolean selectsFullPartitions;
+
+ public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
+ {
+ assert !commands.isEmpty();
+ this.commands = commands;
+ this.limits = limits;
+ SinglePartitionReadCommand firstCommand = commands.get(0);
+ this.nowInSec = firstCommand.nowInSec();
+ this.selectsFullPartitions = firstCommand.selectsFullPartition();
+ for (int i = 1; i < commands.size(); i++)
+ assert commands.get(i).nowInSec() == nowInSec;
+ }
+
+ public static Group one(SinglePartitionReadCommand command)
+ {
+ return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.read(this, consistency, clientState);
+ }
+
+ public int nowInSec()
+ {
+ return nowInSec;
+ }
+
+ public DataLimits limits()
+ {
+ return limits;
+ }
+
+ public CFMetaData metadata()
+ {
+ return commands.get(0).metadata();
+ }
+
+ @Override
+ public boolean selectsFullPartition()
+ {
+ return selectsFullPartitions;
+ }
+
+ public ReadOrderGroup startOrderGroup()
+ {
+ // Note that the only difference between the command in a group must be the partition key on which
+ // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
+ return commands.get(0).startOrderGroup();
+ }
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ {
+ List<PartitionIterator> partitions = new ArrayList<>(commands.size());
+ for (SinglePartitionReadCommand cmd : commands)
+ partitions.add(cmd.executeInternal(orderGroup));
+
+ // Note that the only difference between the command in a group must be the partition key on which
+ // they applied.
+ boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness();
+ // Because we only have enforce the limit per command, we need to enforce it globally.
+ return limits.filter(PartitionIterators.concat(partitions),
+ nowInSec,
+ selectsFullPartitions,
+ enforceStrictLiveness);
+ }
+
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ if (commands.size() == 1)
+ return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
+
+ return new MultiPartitionPager(this, pagingState, protocolVersion);
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ return Iterables.any(commands, c -> c.selectsKey(key));
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ return Iterables.any(commands, c -> c.selectsClustering(key, clustering));
+ }
+
+ @Override
+ public String toString()
+ {
+ return commands.toString();
+ }
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInputPlus in,
+ int version,
+ boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ IndexMetadata index)
+ throws IOException
+ {
+ DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize()));
+ ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
+ }
+ }
+
+ /**
+ * {@code SSTableReaderListener} used to collect metrics about SSTable read access.
+ */
+ private static final class SSTableReadMetricsCollector implements SSTableReadsListener
+ {
+ /**
+ * The number of SSTables that need to be merged. This counter is only updated for single partition queries
+ * since this has been the behavior so far.
+ */
+ private int mergedSSTables;
+
+ @Override
+ public void onSSTableSelected(SSTableReader sstable, RowIndexEntry<?> indexEntry, SelectionReason reason)
+ {
+ sstable.incrementReadCount();
+ mergedSSTables++;
+ }
+
+ /**
+ * Returns the number of SSTables that need to be merged.
+ * @return the number of SSTables that need to be merged.
+ */
+ public int getMergedSSTables()
+ {
+ return mergedSSTables;
+ }
+ }
+}
diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 1f3dbd0,0000000..12dbb39
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@@ -1,412 -1,0 +1,419 @@@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+
+import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
+
+public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
+{
+ protected static final Holder EMPTY = new Holder(PartitionColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+
+ protected final CFMetaData metadata;
+ protected final DecoratedKey partitionKey;
+
+ protected abstract Holder holder();
+ protected abstract boolean canHaveShadowedData();
+
+ protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey partitionKey)
+ {
+ this.metadata = metadata;
+ this.partitionKey = partitionKey;
+ }
+
+ protected static final class Holder
+ {
+ final PartitionColumns columns;
+ final DeletionInfo deletionInfo;
+ // the btree of rows
+ final Object[] tree;
+ final Row staticRow;
+ final EncodingStats stats;
+
+ Holder(PartitionColumns columns, Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats)
+ {
+ this.columns = columns;
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ this.staticRow = staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+ this.stats = stats;
+ }
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return holder().deletionInfo;
+ }
+
+ public Row staticRow()
+ {
+ return holder().staticRow;
+ }
+
+ public boolean isEmpty()
+ {
+ Holder holder = holder();
+ return holder.deletionInfo.isLive() && BTree.isEmpty(holder.tree) && holder.staticRow.isEmpty();
+ }
+
+ public boolean hasRows()
+ {
+ Holder holder = holder();
+ return !BTree.isEmpty(holder.tree);
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionInfo().getPartitionDeletion();
+ }
+
+ public PartitionColumns columns()
+ {
+ return holder().columns;
+ }
+
+ public EncodingStats stats()
+ {
+ return holder().stats;
+ }
+
+ public Row getRow(Clustering clustering)
+ {
+ Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ }
+
+ private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
+ {
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive()))
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata);
+ return row == null ? Rows.EMPTY_STATIC_ROW : row;
+ }
+
+ public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+ {
+ // TODO: we could optimize comparison for "NativeRow" à la #6755
+ final Holder current = holder();
+ return new SearchIterator<Clustering, Row>()
+ {
+ private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
+ private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+
+ public Row next(Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return staticRow(current, columns, true);
+
+ Row row = rawIter.next(clustering);
+ RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
+
+ // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
+ // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
+ // to carry the proper deletion on the row.
+ DeletionTime activeDeletion = partitionDeletion;
+ if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+ activeDeletion = rt.deletionTime();
+
+ if (row == null)
- return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
++ {
++ // this means our partition level deletion superseedes all other deletions and we don't have to keep the row deletions
++ if (activeDeletion == partitionDeletion)
++ return null;
++ // no need to check activeDeletion.isLive here - if anything superseedes the partitionDeletion
++ // it must be non-live
++ return BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
++ }
+
+ return row.filter(columns, activeDeletion, true, metadata);
+ }
+ };
+ }
+
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ return unfilteredIterator(holder(), selection, slices, reversed);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ Row staticRow = staticRow(current, selection, false);
+ if (slices.size() == 0)
+ {
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
+ }
+
+ return slices.size() == 1
+ ? sliceIterator(selection, slices.get(0), reversed, current, staticRow)
+ : new SlicesIterator(selection, slices, reversed, current, staticRow);
+ }
+
+ private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow)
+ {
+ Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+ Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+ Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
+ Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed);
+
+ return merge(rowIter, deleteIter, selection, reversed, current, staticRow);
+ }
+
+ private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
+ ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
+ {
+ return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
+ selection, staticRow, reversed, current.stats,
+ rowIter, deleteIter,
+ canHaveShadowedData());
+ }
+
+ private abstract class AbstractIterator extends AbstractUnfilteredRowIterator
+ {
+ final Holder current;
+ final ColumnFilter selection;
+
+ private AbstractIterator(ColumnFilter selection, boolean isReversed)
+ {
+ this(AbstractBTreePartition.this.holder(), selection, isReversed);
+ }
+
+ private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
+ {
+ this(current,
+ AbstractBTreePartition.this.staticRow(current, selection, false),
+ selection, isReversed);
+ }
+
+ private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
+ {
+ super(AbstractBTreePartition.this.metadata,
+ AbstractBTreePartition.this.partitionKey,
+ current.deletionInfo.getPartitionDeletion(),
+ selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
+ // it would also be more precise to return the intersection of the selection and current.columns,
+ // but its probably not worth spending time on computing that.
+ staticRow,
+ isReversed,
+ current.stats);
+ this.current = current;
+ this.selection = selection;
+ }
+ }
+
+ public class SlicesIterator extends AbstractIterator
+ {
+ private final Slices slices;
+
+ private int idx;
+ private Iterator<Unfiltered> currentSlice;
+
+ private SlicesIterator(ColumnFilter selection,
+ Slices slices,
+ boolean isReversed,
+ Holder current,
+ Row staticRow)
+ {
+ super(current, staticRow, selection, isReversed);
+ this.slices = slices;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (true)
+ {
+ if (currentSlice == null)
+ {
+ if (idx >= slices.size())
+ return endOfData();
+
+ int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
+ currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
+ idx++;
+ }
+
+ if (currentSlice.hasNext())
+ return currentSlice.next();
+
+ currentSlice = null;
+ }
+ }
+ }
+
+ public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator
+ {
+ private Iterator<Unfiltered> iterator;
+
+ protected SliceableIterator(ColumnFilter selection, boolean isReversed)
+ {
+ super(selection, isReversed);
+ }
+
+ protected Unfiltered computeNext()
+ {
+ if (iterator == null)
+ iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder);
+ if (!iterator.hasNext())
+ return endOfData();
+ return iterator.next();
+ }
+
+ public Iterator<Unfiltered> slice(Slice slice)
+ {
+ return sliceIterator(selection, slice, isReverseOrder, current, staticRow);
+ }
+ }
+
+ public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
+ {
+ return new SliceableIterator(columns, reversed);
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+ {
+ return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false);
+ }
+
+ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
+ {
+ return build(iterator, initialRowCapacity, true);
+ }
+
+ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
+ {
+ CFMetaData metadata = iterator.metadata();
+ PartitionColumns columns = iterator.columns();
+ boolean reversed = iterator.isReverseOrder();
+
+ BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
+ builder.auto(!ordered);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
+
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ builder.add((Row)unfiltered);
+ else
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+ }
+
+ if (reversed)
+ builder.reverse();
+
+ return new Holder(columns, builder.build(), deletionBuilder.build(), iterator.staticRow(), iterator.stats());
+ }
+
+ // Note that when building with a RowIterator, deletion will generally be LIVE, but we allow to pass it nonetheless because PartitionUpdate
+ // passes a MutableDeletionInfo that it mutates later.
+ protected static Holder build(RowIterator rows, DeletionInfo deletion, boolean buildEncodingStats, int initialRowCapacity)
+ {
+ CFMetaData metadata = rows.metadata();
+ PartitionColumns columns = rows.columns();
+ boolean reversed = rows.isReverseOrder();
+
+ BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
+ builder.auto(false);
+ while (rows.hasNext())
+ {
+ Row row = rows.next();
+ builder.add(row);
+ }
+
+ if (reversed)
+ builder.reverse();
+
+ Row staticRow = rows.staticRow();
+ Object[] tree = builder.build();
+ EncodingStats stats = buildEncodingStats ? EncodingStats.Collector.collect(staticRow, BTree.iterator(tree), deletion)
+ : EncodingStats.NO_STATS;
+ return new Holder(columns, tree, deletion, staticRow, stats);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s",
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(partitionKey().getKey()),
+ partitionLevelDeletion(),
+ columns()));
+
+ if (staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(staticRow().toString(metadata, true));
+
+ try (UnfilteredRowIterator iter = unfilteredIterator())
+ {
+ while (iter.hasNext())
+ sb.append("\n ").append(iter.next().toString(metadata, true));
+ }
+
+ return sb.toString();
+ }
+
+ public int rowCount()
+ {
+ return BTree.size(holder().tree);
+ }
+
+ public Iterator<Row> iterator()
+ {
+ return BTree.<Row>iterator(holder().tree);
+ }
+
+ public Row lastRow()
+ {
+ Object[] tree = holder().tree;
+ if (BTree.isEmpty(tree))
+ return null;
+
+ return BTree.findByIndex(tree, BTree.size(tree) - 1);
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 4403767,27094d8..bf28843
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@@ -20,8 -20,10 +20,11 @@@ package org.apache.cassandra.distribute
import java.util.ArrayList;
import java.util.Arrays;
+ import java.util.HashSet;
import java.util.List;
+ import java.util.Set;
++import org.apache.cassandra.db.MutableDeletionInfo;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.impl.Versions;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org