You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/09/20 16:48:55 UTC
[2/6] cassandra git commit: Fix short read protection performance
Fix short read protection performance
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13794
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f93e6e34
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f93e6e34
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f93e6e34
Branch: refs/heads/cassandra-3.11
Commit: f93e6e3401c343dec74687d8b079b5697813ab28
Parents: ab5084a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Thu Aug 31 20:51:08 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Wed Sep 20 16:11:18 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 5 +
.../apache/cassandra/service/DataResolver.java | 272 ++++++++++++-------
3 files changed, 181 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d11a3e..07742ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Improve short read protection performance (CASSANDRA-13794)
* Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)
* Fix short read protection for tables with no clustering columns (CASSANDRA-13880)
* Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 983d6b1..e6e46b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2470,4 +2470,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return keyspace.getColumnFamilyStore(id);
}
+
+ public static TableMetrics metricsFor(UUID tableId)
+ {
+ return getIfExists(tableId).metric;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 99399a3..9a98ee5 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,7 +44,7 @@ public class DataResolver extends ResponseResolver
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
- public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
{
super(keyspace, command, consistency, maxResponseCount);
}
@@ -55,6 +55,20 @@ public class DataResolver extends ResponseResolver
return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
}
+ public boolean isDataPresent()
+ {
+ return !responses.isEmpty();
+ }
+
+ public void compareResponses()
+ {
+ // We need to fully consume the results to trigger read repairs if appropriate
+ try (PartitionIterator iterator = resolve())
+ {
+ PartitionIterators.consume(iterator);
+ }
+ }
+
public PartitionIterator resolve()
{
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
@@ -83,54 +97,56 @@ public class DataResolver extends ResponseResolver
* See CASSANDRA-13747 for more details.
*/
- DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
+ DataLimits.Counter mergedResultCounter =
+ command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
- FilteredPartitions filtered = FilteredPartitions.filter(merged,
- new Filter(command.nowInSec(),
- command.metadata().enforceStrictLiveness()));
- PartitionIterator counted = counter.applyTo(filtered);
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+ FilteredPartitions filtered =
+ FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+ PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
return command.isForThrift()
? counted
: Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
- public void compareResponses()
- {
- // We need to fully consume the results to trigger read repairs if appropriate
- try (PartitionIterator iterator = resolve())
- {
- PartitionIterators.consume(iterator);
- }
- }
-
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
InetAddress[] sources,
- DataLimits.Counter resultCounter)
+ DataLimits.Counter mergedResultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if (results.size() == 1)
return results.get(0);
- UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
-
// So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit,
// but that subset not being enough post-reconciliation. So if we don't have limit, don't bother.
if (!command.limits().isUnlimited())
{
for (int i = 0; i < results.size(); i++)
- results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
+ {
+ DataLimits.Counter singleResultCounter =
+ command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
+
+ ShortReadResponseProtection protection =
+ new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter);
+
+ /*
+ * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition()
+ * comments for details. We want singleResultCounter.applyToPartition() to be called after SRRP applies
+ * its transformations, so that this order is preserved when calling applyToRows() too.
+ */
+ results.set(i, Transformation.apply(Transformation.apply(results.get(i), protection), singleResultCounter));
+ }
}
- return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
{
private final InetAddress[] sources;
- public RepairMergeListener(InetAddress[] sources)
+ private RepairMergeListener(InetAddress[] sources)
{
this.sources = sources;
}
@@ -209,7 +225,7 @@ public class DataResolver extends ResponseResolver
// For each source, record if there is an open range to send as repair, and from where.
private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
- public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
+ private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
{
this.partitionKey = partitionKey;
this.columns = columns;
@@ -457,17 +473,18 @@ public class DataResolver extends ResponseResolver
}
}
- private class ShortReadProtection extends Transformation<UnfilteredRowIterator>
+ private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator>
{
private final InetAddress source;
- private final DataLimits.Counter counter;
- private final DataLimits.Counter postReconciliationCounter;
- private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter)
+ private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+ private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+
+ private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
{
this.source = source;
- this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
- this.postReconciliationCounter = postReconciliationCounter;
+ this.singleResultCounter = singleResultCounter;
+ this.mergedResultCounter = mergedResultCounter;
}
@Override
@@ -475,24 +492,25 @@ public class DataResolver extends ResponseResolver
{
ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey());
- partition = MoreRows.extend(partition, protection); // enable moreContents()
-
/*
- * if we don't apply these transformations *after* extending the partition with MoreRows,
- * their applyToRow() method will not be called on the first row of the new extension iterator
+ * Extend for moreContents() then apply protection to track lastClustering.
+ *
+ * If we don't apply the transformation *after* extending the partition with MoreRows,
+ * applyToRow() method of protection will not be called on the first row of the new extension iterator.
*/
- partition = Transformation.apply(partition, protection); // track lastClustering
- partition = Transformation.apply(partition, counter); // do the counting
-
- return partition;
+ return Transformation.apply(MoreRows.extend(partition, protection), protection);
}
private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
{
- final CFMetaData metadata;
- final DecoratedKey partitionKey;
- Clustering lastClustering;
- int lastCount = 0;
+ private final CFMetaData metadata;
+ private final DecoratedKey partitionKey;
+
+ private Clustering lastClustering;
+
+ private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
+ private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
+ private int lastQueried = 0; // # extra rows requested from the replica last time
private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey)
{
@@ -507,79 +525,139 @@ public class DataResolver extends ResponseResolver
return row;
}
- @Override
+ /*
+ * We have a potential short read if the result from a given node contains the requested number of rows
+ * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't
+ * made it into the final post-reconciliation result due to other nodes' tombstones.
+ *
+ * If that is the case, then that node may have more rows that we should fetch, as otherwise we could
+ * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which
+ * which we also need to fetch as they may shadow rows from other replicas' results, which we would
+ * otherwise return incorrectly.
+ *
+ * Also note that we only get here once all the rows for this partition have been iterated over, and so
+ * if the node had returned the requested number of rows but we still get here, then some results were
+ * skipped during reconciliation.
+ */
public UnfilteredRowIterator moreContents()
{
- assert !postReconciliationCounter.isDoneForPartition();
-
- // We have a short read if the node this is the result of has returned the requested number of
- // rows for that partition (i.e. it has stopped returning results due to the limit), but some of
- // those results haven't made it in the final result post-reconciliation due to other nodes
- // tombstones. If that is the case, then the node might have more results that we should fetch
- // as otherwise we might return less results than required, or results that shouldn't be returned
- // (because the node has tombstone that hides future results from other nodes but that haven't
- // been returned due to the limit).
- // Also note that we only get here once all the results for this node have been returned, and so
- // if the node had returned the requested number but we still get there, it imply some results were
- // skipped during reconciliation.
- if (lastCount == counter.counted() || !counter.isDoneForPartition())
+ // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
+ assert !mergedResultCounter.isDoneForPartition();
+
+ // we do not apply short read protection when we have no limits at all
+ assert !command.limits().isUnlimited();
+
+ // if the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more
+ if (!singleResultCounter.isDoneForPartition())
+ return null;
+
+ /*
+ * If the replica has no live rows in the partition, don't try to fetch more.
+ *
+ * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
+ * always cover this scenario:
+ * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
+ * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
+ *
+ * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
+ * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
+ * have tombstones in the current partition.
+ *
+ * One other way we can hit this condition is when the partition only has a live static row and no regular
+ * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
+ * the moreContents() call.
+ */
+ if (singleResultCounter.countedInCurrentPartition() == 0)
return null;
- // clustering of the last row returned is empty, meaning that there is only one row per partition,
- // and we already have it.
- if (lastClustering == Clustering.EMPTY)
+ /*
+ * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
+ * We already have the row, so there is no point in asking for more from the partition.
+ */
+ if (Clustering.EMPTY == lastClustering)
return null;
- lastCount = counter.counted();
-
- // We need to try to query enough additional results to fulfill our query, but because we could still
- // get short reads on that additional query, just querying the number of results we miss may not be
- // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only
- // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result.
- // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that
- // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
- // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
- // counting iterator.
- int n = postReconciliationCounter.countedInCurrentPartition();
- int x = counter.countedInCurrentPartition();
- int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
-
- DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
+ lastFetched = singleResultCounter.countedInCurrentPartition() - lastCounted;
+ lastCounted = singleResultCounter.countedInCurrentPartition();
+
+ // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
+ if (lastQueried > 0 && lastFetched < lastQueried)
+ return null;
+
+ /*
+ * At this point we know that:
+ * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
+ * rows in the partition
+ * 2. at least one of those returned rows was shadowed by a tombstone returned from another
+ * replica
+ * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
+ * avoid a short read
+ *
+ * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
+ * are defined as follows:
+ * [a] limits.count() - mergedResultCounter.counted()
+ * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+ *
+ * It would be naive to query for exactly that many rows, as it's possible and not unlikely
+ * that some of the returned rows would also be shadowed by tombstones from other hosts.
+ *
+ * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
+ * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
+ *
+ * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
+ * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
+ * marginal cost for each extra row requested.
+ *
+ * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
+ * time we want to respect paging limits and not blow up spectacularly.
+ *
+ * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
+ * counts.
+ *
+ * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits,
+ * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row
+ * for SELECT DISTINCT queries (that set per partition limit to 1).
+ *
+ * See CASSANDRA-13794 for more details.
+ */
+ lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8);
+
+ ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
+
+ return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+ }
+
+ private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+ {
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
- ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false);
- SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
- command.nowInSec(),
- command.columnFilter(),
- command.rowFilter(),
- retryLimits,
- partitionKey,
- retryFilter);
-
- Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
- Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
-
- return doShortReadRetry(cmd);
+ if (null != lastClustering)
+ filter = filter.forPaging(metadata.comparator, lastClustering, false);
+
+ return SinglePartitionReadCommand.create(command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ command.limits().forShortReadRetry(toQuery),
+ partitionKey,
+ filter);
}
- private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
+ private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd)
{
- DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
- ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+ DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1);
+ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
else
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
+ MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler);
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
assert resolver.responses.size() == 1;
- return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand);
+ return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), cmd);
}
}
}
-
- public boolean isDataPresent()
- {
- return !responses.isEmpty();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org