You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/09/20 16:48:55 UTC

[2/6] cassandra git commit: Fix short read protection performance

Fix short read protection performance

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13794


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f93e6e34
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f93e6e34
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f93e6e34

Branch: refs/heads/cassandra-3.11
Commit: f93e6e3401c343dec74687d8b079b5697813ab28
Parents: ab5084a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Thu Aug 31 20:51:08 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Wed Sep 20 16:11:18 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../apache/cassandra/service/DataResolver.java  | 272 ++++++++++++-------
 3 files changed, 181 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2d11a3e..07742ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Improve short read protection performance (CASSANDRA-13794)
  * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)
  * Fix short read protection for tables with no clustering columns (CASSANDRA-13880)
  * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 983d6b1..e6e46b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2470,4 +2470,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         return keyspace.getColumnFamilyStore(id);
     }
+
+    public static TableMetrics metricsFor(UUID tableId)
+    {
+        return getIfExists(tableId).metric;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 99399a3..9a98ee5 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,7 +44,7 @@ public class DataResolver extends ResponseResolver
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
-    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
     {
         super(keyspace, command, consistency, maxResponseCount);
     }
@@ -55,6 +55,20 @@ public class DataResolver extends ResponseResolver
         return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
     }
 
+    public boolean isDataPresent()
+    {
+        return !responses.isEmpty();
+    }
+
+    public void compareResponses()
+    {
+        // We need to fully consume the results to trigger read repairs if appropriate
+        try (PartitionIterator iterator = resolve())
+        {
+            PartitionIterators.consume(iterator);
+        }
+    }
+
     public PartitionIterator resolve()
     {
         // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
@@ -83,54 +97,56 @@ public class DataResolver extends ResponseResolver
          * See CASSANDRA-13747 for more details.
          */
 
-        DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
+        DataLimits.Counter mergedResultCounter =
+            command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
 
-        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
-        FilteredPartitions filtered = FilteredPartitions.filter(merged,
-                                                                new Filter(command.nowInSec(),
-                                                                           command.metadata().enforceStrictLiveness()));
-        PartitionIterator counted = counter.applyTo(filtered);
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+        FilteredPartitions filtered =
+            FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+        PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
 
         return command.isForThrift()
              ? counted
              : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
-    public void compareResponses()
-    {
-        // We need to fully consume the results to trigger read repairs if appropriate
-        try (PartitionIterator iterator = resolve())
-        {
-            PartitionIterators.consume(iterator);
-        }
-    }
-
     private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
                                                                      InetAddress[] sources,
-                                                                     DataLimits.Counter resultCounter)
+                                                                     DataLimits.Counter mergedResultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
         if (results.size() == 1)
             return results.get(0);
 
-        UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
-
         // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit,
         // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother.
         if (!command.limits().isUnlimited())
         {
             for (int i = 0; i < results.size(); i++)
-                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
+            {
+                DataLimits.Counter singleResultCounter =
+                    command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
+
+                ShortReadResponseProtection protection =
+                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter);
+
+                /*
+                 * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition()
+                 * comments for details. We want singleResultCounter.applyToPartition() to be called after SRRP applies
+                 * its transformations, so that this order is preserved when calling applyToRows() too.
+                 */
+                results.set(i, Transformation.apply(Transformation.apply(results.get(i), protection), singleResultCounter));
+            }
         }
 
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
     {
         private final InetAddress[] sources;
 
-        public RepairMergeListener(InetAddress[] sources)
+        private RepairMergeListener(InetAddress[] sources)
         {
             this.sources = sources;
         }
@@ -209,7 +225,7 @@ public class DataResolver extends ResponseResolver
             // For each source, record if there is an open range to send as repair, and from where.
             private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
 
-            public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
+            private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
             {
                 this.partitionKey = partitionKey;
                 this.columns = columns;
@@ -457,17 +473,18 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private class ShortReadProtection extends Transformation<UnfilteredRowIterator>
+    private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator>
     {
         private final InetAddress source;
-        private final DataLimits.Counter counter;
-        private final DataLimits.Counter postReconciliationCounter;
 
-        private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter)
+        private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+        private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+
+        private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
         {
             this.source = source;
-            this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
-            this.postReconciliationCounter = postReconciliationCounter;
+            this.singleResultCounter = singleResultCounter;
+            this.mergedResultCounter = mergedResultCounter;
         }
 
         @Override
@@ -475,24 +492,25 @@ public class DataResolver extends ResponseResolver
         {
             ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey());
 
-            partition = MoreRows.extend(partition, protection); // enable moreContents()
-
             /*
-             * if we don't apply these transformations *after* extending the partition with MoreRows,
-             * their applyToRow() method will not be called on the first row of the new extension iterator
+             * Extend for moreContents() then apply protection to track lastClustering.
+             *
+             * If we don't apply the transformation *after* extending the partition with MoreRows,
+             * applyToRow() method of protection will not be called on the first row of the new extension iterator.
              */
-            partition = Transformation.apply(partition, protection); // track lastClustering
-            partition = Transformation.apply(partition, counter);    // do the counting
-
-            return partition;
+            return Transformation.apply(MoreRows.extend(partition, protection), protection);
         }
 
         private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
         {
-            final CFMetaData metadata;
-            final DecoratedKey partitionKey;
-            Clustering lastClustering;
-            int lastCount = 0;
+            private final CFMetaData metadata;
+            private final DecoratedKey partitionKey;
+
+            private Clustering lastClustering;
+
+            private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows
+            private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command)
+            private int lastQueried = 0; // # extra rows requested from the replica last time
 
             private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey)
             {
@@ -507,79 +525,139 @@ public class DataResolver extends ResponseResolver
                 return row;
             }
 
-            @Override
+            /*
+             * We have a potential short read if the result from a given node contains the requested number of rows
+             * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't
+             * made it into the final post-reconciliation result due to other nodes' tombstones.
+             *
+             * If that is the case, then that node may have more rows that we should fetch, as otherwise we could
+             * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which
+             * which we also need to fetch as they may shadow rows from other replicas' results, which we would
+             * otherwise return incorrectly.
+             *
+             * Also note that we only get here once all the rows for this partition have been iterated over, and so
+             * if the node had returned the requested number of rows but we still get here, then some results were
+             * skipped during reconciliation.
+             */
             public UnfilteredRowIterator moreContents()
             {
-                assert !postReconciliationCounter.isDoneForPartition();
-
-                // We have a short read if the node this is the result of has returned the requested number of
-                // rows for that partition (i.e. it has stopped returning results due to the limit), but some of
-                // those results haven't made it in the final result post-reconciliation due to other nodes
-                // tombstones. If that is the case, then the node might have more results that we should fetch
-                // as otherwise we might return less results than required, or results that shouldn't be returned
-                // (because the node has tombstone that hides future results from other nodes but that haven't
-                // been returned due to the limit).
-                // Also note that we only get here once all the results for this node have been returned, and so
-                // if the node had returned the requested number but we still get there, it imply some results were
-                // skipped during reconciliation.
-                if (lastCount == counter.counted() || !counter.isDoneForPartition())
+                // never try to request additional rows from replicas if our reconciled partition is already filled to the limit
+                assert !mergedResultCounter.isDoneForPartition();
+
+                // we do not apply short read protection when we have no limits at all
+                assert !command.limits().isUnlimited();
+
+                // if the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more
+                if (!singleResultCounter.isDoneForPartition())
+                    return null;
+
+                /*
+                 * If the replica has no live rows in the partition, don't try to fetch more.
+                 *
+                 * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't
+                 * always cover this scenario:
+                 * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit],
+                 * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition.
+                 *
+                 * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch
+                 * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only
+                 * have tombstones in the current partition.
+                 *
+                 * One other way we can hit this condition is when the partition only has a live static row and no regular
+                 * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after
+                 * the moreContents() call.
+                 */
+                if (singleResultCounter.countedInCurrentPartition() == 0)
                     return null;
 
-                // clustering of the last row returned is empty, meaning that there is only one row per partition,
-                // and we already have it.
-                if (lastClustering == Clustering.EMPTY)
+                /*
+                 * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering.
+                 * We already have the row, so there is no point in asking for more from the partition.
+                 */
+                if (Clustering.EMPTY == lastClustering)
                     return null;
 
-                lastCount = counter.counted();
-
-                // We need to try to query enough additional results to fulfill our query, but because we could still
-                // get short reads on that additional query, just querying the number of results we miss may not be
-                // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only
-                // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result.
-                // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that
-                // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
-                // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
-                // counting iterator.
-                int n = postReconciliationCounter.countedInCurrentPartition();
-                int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
-
-                DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
+                lastFetched = singleResultCounter.countedInCurrentPartition() - lastCounted;
+                lastCounted = singleResultCounter.countedInCurrentPartition();
+
+                // getting back fewer rows than we asked for means the partition on the replica has been fully consumed
+                if (lastQueried > 0 && lastFetched < lastQueried)
+                    return null;
+
+                /*
+                 * At this point we know that:
+                 *     1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more
+                 *        rows in the partition
+                 *     2. at least one of those returned rows was shadowed by a tombstone returned from another
+                 *        replica
+                 *     3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to
+                 *        avoid a short read
+                 *
+                 * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b
+                 * are defined as follows:
+                 *     [a] limits.count() - mergedResultCounter.counted()
+                 *     [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+                 *
+                 * It would be naive to query for exactly that many rows, as it's possible and not unlikely
+                 * that some of the returned rows would also be shadowed by tombstones from other hosts.
+                 *
+                 * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result;
+                 * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it.
+                 *
+                 * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number
+                 * of rows fetched: there is a high transactional cost for every individual request, but a relatively low
+                 * marginal cost for each extra row requested.
+                 *
+                 * As such it's better to overfetch than to underfetch extra rows from a host; but at the same
+                 * time we want to respect paging limits and not blow up spectacularly.
+                 *
+                 * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only
+                 * counts.
+                 *
+                 * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits,
+                 * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row
+                 * for SELECT DISTINCT queries (that set per partition limit to 1).
+                 *
+                 * See CASSANDRA-13794 for more details.
+                 */
+                lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8);
+
+                ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+                Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source);
+
+                return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+            }
+
+            private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+            {
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
-                ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false);
-                SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
-                                                                                   command.nowInSec(),
-                                                                                   command.columnFilter(),
-                                                                                   command.rowFilter(),
-                                                                                   retryLimits,
-                                                                                   partitionKey,
-                                                                                   retryFilter);
-
-                Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
-                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
-
-                return doShortReadRetry(cmd);
+                if (null != lastClustering)
+                    filter = filter.forPaging(metadata.comparator, lastClustering, false);
+
+                return SinglePartitionReadCommand.create(command.metadata(),
+                                                         command.nowInSec(),
+                                                         command.columnFilter(),
+                                                         command.rowFilter(),
+                                                         command.limits().forShortReadRetry(toQuery),
+                                                         partitionKey,
+                                                         filter);
             }
 
-            private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
+            private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd)
             {
-                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
-                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1);
+                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
                 if (StorageProxy.canDoLocalRequest(source))
-                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
                 else
-                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
+                    MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler);
 
                 // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand);
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), cmd);
             }
         }
     }
-
-    public boolean isDataPresent()
-    {
-        return !responses.isEmpty();
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org