You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/05/20 14:40:54 UTC

[GitHub] [cassandra] blerer commented on a change in pull request #493: CASSANDRA-8272 trunk Fix replica-side filtering

blerer commented on a change in pull request #493:
URL: https://github.com/apache/cassandra/pull/493#discussion_r428067806



##########
File path: src/java/org/apache/cassandra/service/reads/DataResolver.java
##########
@@ -103,6 +103,156 @@ public PartitionIterator resolve()
             });
         }
 
+        if (!needsReplicaFilteringProtection())
+        {
+            ResolveContext context = new ResolveContext(replicas);
+            return resolveWithReadRepair(context,
+                                         i -> shortReadProtectedResponse(i, context),
+                                         UnaryOperator.identity(),
+                                         repairedDataTracker);
+        }
+
+        return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker);
+    }
+
+    private boolean needsReplicaFilteringProtection()
+    {
+        if (command.rowFilter().isEmpty())
+            return false;
+
+        IndexMetadata indexDef = command.indexMetadata();
+        if (indexDef != null && indexDef.isCustom())
+        {
+            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+            return !SASIIndex.class.getName().equals(className);
+        }
+
+        return true;
+    }
+
+    private class ResolveContext
+    {
+        private final E replicas;
+        private final DataLimits.Counter mergedResultCounter;
+
+        private ResolveContext(E replicas)
+        {
+            this.replicas = replicas;
+            this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
+                                                                   true,
+                                                                   command.selectsFullPartition(),
+                                                                   enforceStrictLiveness);
+        }
+
+        private boolean needsReadRepair()
+        {
+            return replicas.size() > 1;
+        }
+
+        private boolean needShortReadProtection()
+        {
+            // If we have only one result, there is no read repair to do and we can't get short reads
+            // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
+            // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
+            // don't bother protecting against short reads.
+            return replicas.size() > 1 && !command.limits().isUnlimited();
+        }
+    }
+
+    @FunctionalInterface
+    private interface ResponseProvider
+    {
+        UnfilteredPartitionIterator getResponse(int i);
+    }
+
+    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
+    {
+        UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
+
+        return context.needShortReadProtection()
+               ? ShortReadProtection.extend(context.replicas.get(i),
+                                            originalResponse,
+                                            command,
+                                            context.mergedResultCounter,
+                                            queryStartNanoTime,
+                                            enforceStrictLiveness)
+               : originalResponse;
+    }
+
+    private PartitionIterator resolveWithReadRepair(ResolveContext context,
+                                                    ResponseProvider responseProvider,
+                                                    UnaryOperator<PartitionIterator> preCountFilter,
+                                                    RepairedDataTracker repairedDataTracker)
+    {
+        UnfilteredPartitionIterators.MergeListener listener = null;
+        if (context.needsReadRepair())
+        {
+            P sources = replicaPlan.getWithContacts(context.replicas);
+            listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker);
+        }
+
+        return resolveInternal(context, listener, responseProvider, preCountFilter);
+    }
+
+    @SuppressWarnings("resource")
+    private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker)
+    {
+        // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
+        // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
+        // of that row) works in 3 steps:
+        //   1) we read the full response just to collect rows that may be outdated (the ones we got from some
+        //      replica but didn't got any response for other; it could be those other replica have filtered a more
+        //      up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
+        //      query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
+        //      indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
+        //      protection).
+        //   2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
+        //      or not.
+        //   3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
+        //      but where for each replica we use their original response _plus_ the additional rows queried in the
+        //      previous step (and apply the command#rowFilter() on the full result). Since the first phase has
+        //      pessimistically collected enough results for the case where all potentially outdated results are indeed
+        //      outdated, we shouldn't need further short-read protection requests during this phase.
+
+        // We need separate contexts, as each context has his own counter
+        ResolveContext firstPhaseContext = new ResolveContext(replicas);
+        ResolveContext secondPhaseContext = new ResolveContext(replicas);
+        ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(),
+                                                                             command,
+                                                                             replicaPlan().consistencyLevel(),
+                                                                             queryStartNanoTime,
+                                                                             firstPhaseContext.replicas);
+        PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
+                                                                 rfp.mergeController(),
+                                                                 i -> shortReadProtectedResponse(i, firstPhaseContext),
+                                                                 UnaryOperator.identity());
+
+        // Consume the fist phase partitions to populate the replica filtering protection with both those materialized

Review comment:
       fist -> first




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org