You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/05 12:16:45 UTC

[cassandra] branch cassandra-3.0 updated: Fix replica-side filtering returning stale data with CL > 1

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new dd255ff  Fix replica-side filtering returning stale data with CL > 1
dd255ff is described below

commit dd255ffa07d0263521a1ca863fc2192db19bc04c
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Wed May 27 11:01:42 2020 +0100

    Fix replica-side filtering returning stale data with CL > 1
    
    patch by Andres de la Peña; reviewed by Benjamin Lerer, Caleb Rackliffe and ZhaoYang for CASSANDRA-8272
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/DataRange.java    |  10 +
 .../cassandra/db/PartitionRangeReadCommand.java    |   5 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |   7 +
 .../cassandra/db/SinglePartitionReadCommand.java   |   5 +
 .../db/compaction/CompactionIterator.java          |   3 +-
 .../org/apache/cassandra/db/filter/RowFilter.java  |  87 ++--
 .../partitions/UnfilteredPartitionIterators.java   |  30 +-
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  32 +-
 .../org/apache/cassandra/metrics/TableMetrics.java |  18 +-
 .../org/apache/cassandra/service/DataResolver.java | 162 +++++--
 .../service/ReplicaFilteringProtection.java        | 465 +++++++++++++++++++++
 .../cassandra/utils/concurrent/Accumulator.java    |  13 +
 .../utils/concurrent/AccumulatorTest.java          |  54 ++-
 14 files changed, 800 insertions(+), 92 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 46b3f56..ff00579 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
  * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
  * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
  * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index d2f9c76..f6776c4 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -196,6 +196,16 @@ public class DataRange
     }
 
     /**
+     * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     *
+     * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     */
+    public boolean isReversed()
+    {
+        return clusteringIndexFilter.isReversed();
+    }
+
+    /**
      * The clustering index filter to use for the provided key.
      * <p>
      * This may or may not be the same filter for all keys (that is, paging range
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 4f936cc..1da66c1 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DatabaseDescriptor.getRangeRpcTimeout();
     }
 
+    public boolean isReversed()
+    {
+        return dataRange.isReversed();
+    }
+
     public boolean selectsKey(DecoratedKey key)
     {
         if (!dataRange().contains(key))
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index b499daf..39a5402 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
+    /**
+     * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     *
+     * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     */
+    public abstract boolean isReversed();
+
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
         // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 2e014ba..841c3b9 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand
         return DatabaseDescriptor.getReadRpcTimeout();
     }
 
+    public boolean isReversed()
+    {
+        return clusteringIndexFilter.isReversed();
+    }
+
     public boolean selectsKey(DecoratedKey key)
     {
         if (!this.partitionKey().equals(key))
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b132d90..8c4732b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -199,11 +199,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                     {
                     }
 
-                    public void onMergedRows(Row merged, Row[] versions)
+                    public Row onMergedRows(Row merged, Row[] versions)
                     {
                         indexTransaction.start();
                         indexTransaction.onRowMerge(merged, versions);
                         indexTransaction.commit();
+                        return merged;
                     }
 
                     public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4300651..774e4d3 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -126,6 +126,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         return false;
     }
 
+    protected abstract Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec);
+
     /**
      * Filters the provided iterator so that only the row satisfying the expression of this filter
      * are included in the resulting iterator.
@@ -134,7 +136,23 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
      * @param nowInSec the time of query in seconds.
      * @return the filtered iterator.
      */
-    public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
+    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
+    {
+        return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec));
+    }
+
+    /**
+     * Filters the provided iterator so that only the row satisfying the expression of this filter
+     * are included in the resulting iterator.
+     *
+     * @param iter the iterator to filter
+     * @param nowInSec the time of query in seconds.
+     * @return the filtered iterator.
+     */
+    public PartitionIterator filter(PartitionIterator iter, CFMetaData metadata, int nowInSec)
+    {
+        return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec));
+    }
 
     /**
      * Whether the provided row in the provided partition satisfies this filter.
@@ -263,20 +281,16 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             super(expressions);
         }
 
-        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
+        protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
         {
-            if (expressions.isEmpty())
-                return iter;
-
-            final CFMetaData metadata = iter.metadata();
             long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count();
             final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0;
             final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0;
 
-            class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator>
+            return new Transformation<BaseRowIterator<?>>()
             {
                 DecoratedKey pk;
-                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+                protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
                 {
                     // The filter might be on static columns, so need to check static row first.
                     if (filterStaticColumns && applyToRow(partition.staticRow()) == null)
@@ -286,7 +300,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                     }
 
                     pk = partition.partitionKey();
-                    UnfilteredRowIterator iterator = Transformation.apply(partition, this);
+                    BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator
+                                                  ? Transformation.apply((UnfilteredRowIterator) partition, this)
+                                                  : Transformation.apply((RowIterator) partition, this);
 
                     if (filterNonStaticColumns && !iterator.hasNext())
                     {
@@ -308,9 +324,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
                             return null;
                     return row;
                 }
-            }
-
-            return Transformation.apply(iter, new IsSatisfiedFilter());
+            };
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)
@@ -326,36 +340,47 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
             super(expressions);
         }
 
-        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+        protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
         {
-            if (expressions.isEmpty())
-                return iter;
-
-            class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator>
+            // Thrift does not filter rows, it filters entire partition if any of the expression is not
+            // satisfied, which forces us to materialize the result (in theory we could materialize only
+            // what we need which might or might not be everything, but we keep it simple since in practice
+            // it's not worth that it has ever been).
+            return new Transformation<BaseRowIterator<?>>()
             {
-                @Override
-                public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+                protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
+                {
+                    return partition instanceof UnfilteredRowIterator ? applyTo((UnfilteredRowIterator) partition)
+                                                                      : applyTo((RowIterator) partition);
+                }
+
+                private UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
+                {
+                    ImmutableBTreePartition result = ImmutableBTreePartition.create(partition);
+                    partition.close();
+                    return accepts(result) ? result.unfilteredIterator() : null;
+                }
+
+                private RowIterator applyTo(RowIterator partition)
                 {
-                    // Thrift does not filter rows, it filters entire partition if any of the expression is not
-                    // satisfied, which forces us to materialize the result (in theory we could materialize only
-                    // what we need which might or might not be everything, but we keep it simple since in practice
-                    // it's not worth that it has ever been).
-                    ImmutableBTreePartition result = ImmutableBTreePartition.create(iter);
-                    iter.close();
+                    FilteredPartition result = FilteredPartition.create(partition);
+                    return accepts(result) ? result.rowIterator() : null;
+                }
 
+                private boolean accepts(ImmutableBTreePartition result)
+                {
                     // The partition needs to have a row for every expression, and the expression needs to be valid.
                     for (Expression expr : expressions)
                     {
                         assert expr instanceof ThriftExpression;
-                        Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
-                        if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row))
-                            return null;
+                        Row row = result.getRow(makeCompactClustering(metadata, expr.column().name.bytes));
+                        if (row == null || !expr.isSatisfiedBy(metadata, result.partitionKey(), row))
+                            return false;
                     }
                     // If we get there, it means all expressions where satisfied, so return the original result
-                    return result.unfilteredIterator();
+                    return true;
                 }
-            }
-            return Transformation.apply(iter, new IsSatisfiedThriftFilter());
+            };
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 3f7d072..bff910e 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators
     public interface MergeListener
     {
         public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
-        public void close();
+        public default void close() {}
     }
 
     @SuppressWarnings("resource") // The created resources are returned right away
@@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
+    public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
+    {
+        if (iterators.size() == 1)
+            return iterators.get(0);
+
+        class Extend implements MorePartitions<UnfilteredPartitionIterator>
+        {
+            int i = 1;
+            public UnfilteredPartitionIterator moreContents()
+            {
+                if (i >= iterators.size())
+                    return null;
+                return iterators.get(i++);
+            }
+        }
+        return MorePartitions.extend(iterators.get(0), new Extend());
+    }
+
     public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);
@@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators
 
     public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener)
     {
-        assert listener != null;
         assert !iterators.isEmpty();
 
         final boolean isForThrift = iterators.get(0).isForThrift();
@@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators
 
             protected UnfilteredRowIterator getReduced()
             {
-                UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge);
+                UnfilteredRowIterators.MergeListener rowListener = listener == null
+                                                                 ? null
+                                                                 : listener.getRowMergeListener(partitionKey, toMerge);
 
                 // Replace nulls by empty iterators
                 for (int i = 0; i < toMerge.size(); i++)
@@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
-                listener.close();
+
+                if (listener != null)
+                    listener.close();
             }
         };
     }
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index f42f675..b6dbf82 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators
          * particular, this may be called in cases where there is no row in the merged output (if a source has a row
          * that is shadowed by another source range tombstone or partition level deletion).
          *
-         * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
-         * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
+         * @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the
+         * clustering from this safely)but can be empty, in which case this is a placeholder for when at least one
+         * source has a row, but that row is shadowed in the merged output.
          * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
          * {@code null} for some sources if the source has not such row.
+         * @return the row to use as result of the merge (can be {@code null}). Most implementations should simply
+         * return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this
+         * returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a
+         * non {@code null} result, then the returned row <b>must</b> have the same clustering than {@code merged}.
          */
-        public void onMergedRows(Row merged, Row[] versions);
+        public Row onMergedRows(Row merged, Row[] versions);
 
         /**
          * Called once for every range tombstone marker participating in the merge.
@@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators
             Row merged = merger.merge(partitionDeletion);
             if (merged == null)
                 merged = Rows.EMPTY_STATIC_ROW;
-            if (listener != null)
-                listener.onMergedRows(merged, merger.mergedRows());
-            return merged;
+            if (listener == null)
+                return merged;
+
+            merged = listener.onMergedRows(merged, merger.mergedRows());
+            // Note that onMergedRows can have returned null even though his input wasn't null
+            return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
         }
 
         private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
@@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators
                 if (nextKind == Unfiltered.Kind.ROW)
                 {
                     Row merged = rowMerger.merge(markerMerger.activeDeletion());
-                    if (listener != null)
-                        listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows());
-                    return merged;
+                    if (listener == null)
+                        return merged;
+
+                    merged = listener.onMergedRows(merged == null
+                                                   ? BTreeRow.emptyRow(rowMerger.mergedClustering())
+                                                   : merged,
+                                                   rowMerger.mergedRows());
+
+                    return merged == null || merged.isEmpty() ? null : merged;
                 }
                 else
                 {
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index eb56ed9..1f4803e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -153,6 +153,7 @@ public class TableMetrics
 
     public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
+    public final Meter replicaSideFilteringProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
@@ -649,8 +650,9 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
 
-        readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
-        shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+        readRepairRequests = createTableMeter("ReadRepairRequests");
+        shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests");
+        replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests");
     }
 
     public void updateSSTableIterated(int count)
@@ -758,6 +760,18 @@ public class TableMetrics
         return cfCounter;
     }
 
+    private Meter createTableMeter(final String name)
+    {
+        return createTableMeter(name, name);
+    }
+
+    private Meter createTableMeter(final String name, final String alias)
+    {
+        Meter tableMeter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+        register(name, alias, tableMeter);
+        return tableMeter;
+    }
+
     /**
      * Create a histogram-like interface that will register both a CF, keyspace and global level
      * histogram and forward any updates to both
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 522c57b..02d355e 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver
 
     public PartitionIterator resolve()
     {
+        if (!needsReplicaFilteringProtection())
+        {
+            ResolveContext context = new ResolveContext(responses.size());
+            return resolveWithReadRepair(context,
+                                         i -> shortReadProtectedResponse(i, context),
+                                         UnaryOperator.identity());
+        }
+
+        return resolveWithReplicaFilteringProtection();
+    }
+
+    private boolean needsReplicaFilteringProtection()
+    {
+        return !command.rowFilter().isEmpty();
+    }
+
+    private class ResolveContext
+    {
+        private final InetAddress[] sources;
+        private final DataLimits.Counter mergedResultCounter;
+
+        private ResolveContext(int count)
+        {
+            assert count <= responses.size();
+            this.sources = new InetAddress[count];
+            for (int i = 0; i < count; i++)
+                sources[i] = responses.get(i).from;
+            this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
+                                                                   true,
+                                                                   command.selectsFullPartition(),
+                                                                   enforceStrictLiveness);
+        }
+
+        private boolean needShortReadProtection()
+        {
+            // If we have only one result, there is no read repair to do and we can't get short reads
+            // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
+            // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
+            // don't bother protecting against short reads.
+            return sources.length > 1 && !command.limits().isUnlimited();
+        }
+    }
+
+    @FunctionalInterface
+    private interface ResponseProvider
+    {
+        UnfilteredPartitionIterator getResponse(int i);
+    }
+
+    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
+    {
+        UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
+
+        return context.needShortReadProtection()
+               ? extendWithShortReadProtection(originalResponse, context.sources[i], context.mergedResultCounter)
+               : originalResponse;
+    }
+
+    private PartitionIterator resolveWithReadRepair(ResolveContext context,
+                                                    ResponseProvider responseProvider,
+                                                    UnaryOperator<PartitionIterator> preCountFilter)
+    {
+        return resolveInternal(context, new RepairMergeListener(context.sources), responseProvider, preCountFilter);
+    }
+
+    private PartitionIterator resolveWithReplicaFilteringProtection()
+    {
+        // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
+        // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
+        // of that row) works in 3 steps:
+        //   1) we read the full response just to collect rows that may be outdated (the ones we got from some
+        //      replica but didn't got any response for other; it could be those other replica have filtered a more
+        //      up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
+        //      query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
+        //      indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
+        //      protection).
+        //   2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
+        //      or not.
+        //   3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
+        //      but where for each replica we use their original response _plus_ the additional rows queried in the
+        //      previous step (and apply the command#rowFilter() on the full result). Since the first phase has
+        //      pessimistically collected enough results for the case where all potentially outdated results are indeed
+        //      outdated, we shouldn't need further short-read protection requests during this phase.
+
         // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
         // at the beginning of this method), so grab the response count once and use that through the method.
         int count = responses.size();
-        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
-        InetAddress[] sources = new InetAddress[count];
+        // We need separate contexts, as each context has his own counter
+        ResolveContext firstPhaseContext = new ResolveContext(count);
+        ResolveContext secondPhaseContext = new ResolveContext(count);
+        ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, firstPhaseContext.sources);
+        PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
+                                                                 rfp.mergeController(),
+                                                                 i -> shortReadProtectedResponse(i, firstPhaseContext),
+                                                                 UnaryOperator.identity());
+
+        // Consume the first phase partitions to populate the replica filtering protection with both those materialized
+        // partitions and the primary keys to be fetched.
+        PartitionIterators.consume(firstPhasePartitions);
+        firstPhasePartitions.close();
+
+        // After reading the entire query results the protection helper should have cached all the partitions so we can
+        // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if
+        // it needs to query replicas.
+        responses.clearUnsafe();
+
+        return resolveWithReadRepair(secondPhaseContext,
+                                     rfp::queryProtectedPartitions,
+                                     results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+    }
+
+    private PartitionIterator resolveInternal(ResolveContext context,
+                                              UnfilteredPartitionIterators.MergeListener mergeListener,
+                                              ResponseProvider responseProvider,
+                                              UnaryOperator<PartitionIterator> preCountFilter)
+    {
+        int count = context.sources.length;
+        List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
         for (int i = 0; i < count; i++)
-        {
-            MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command));
-            sources[i] = msg.from;
-        }
+            results.add(responseProvider.getResponse(i));
 
         /*
          * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
@@ -106,36 +216,14 @@ public class DataResolver extends ResponseResolver
          * See CASSANDRA-13747 for more details.
          */
 
-        DataLimits.Counter mergedResultCounter =
-            command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
-
-        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+        UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener);
         FilteredPartitions filtered =
-            FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
-        PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
+        FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+        PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
 
         return command.isForThrift()
-             ? counted
-             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
-    }
-
-    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     InetAddress[] sources,
-                                                                     DataLimits.Counter mergedResultCounter)
-    {
-        // If we have only one results, there is no read repair to do and we can't get short reads
-        if (results.size() == 1)
-            return results.get(0);
-
-        /*
-         * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
-         * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
-         */
-        if (!command.limits().isUnlimited())
-            for (int i = 0; i < results.size(); i++)
-                results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter));
-
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
+               ? counted
+               : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -294,13 +382,13 @@ public class DataResolver extends ResponseResolver
                 }
             }
 
-            public void onMergedRows(Row merged, Row[] versions)
+            public Row onMergedRows(Row merged, Row[] versions)
             {
                 // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
                 // those case directly in their respective methods (in other words, it would be inefficient to send a row
                 // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
                 if (merged.isEmpty())
-                    return;
+                    return merged;
 
                 Rows.diff(diffListener, merged, versions);
                 for (int i = 0; i < currentRows.length; i++)
@@ -309,6 +397,8 @@ public class DataResolver extends ResponseResolver
                         update(i).add(currentRows[i].build());
                 }
                 Arrays.fill(currentRows, null);
+
+                return merged;
             }
 
             private DeletionTime currentDeletion()
diff --git a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
new file mode 100644
index 0000000..36d51cc
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+/**
+ * Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results
+ * being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}).
+ * <p>
+ * When using replica-side filtering with CL>ONE, a replica can send a stale result satisfying the filter, while updated
+ * replicas won't send a corresponding tombstone to discard that result during reconciliation. This helper identifies
+ * the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by
+ * primary key to the "silent" replicas in a second fetch round.
+ * <p>
+ * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
+ */
+class ReplicaFilteringProtection
+{
+    private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class);
+
+    private final Keyspace keyspace;
+    private final ReadCommand command;
+    private final ConsistencyLevel consistency;
+    private final InetAddress[] sources;
+    private final TableMetrics tableMetrics;
+
+    /**
+     * Per-source primary keys of the rows that might be outdated so they need to be fetched.
+     * For outdated static rows we use an empty builder to signal it has to be queried.
+     */
+    private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch;
+
+    /**
+     * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows.
+     */
+    private final List<List<PartitionBuilder>> originalPartitions;
+
+    ReplicaFilteringProtection(Keyspace keyspace,
+                               ReadCommand command,
+                               ConsistencyLevel consistency,
+                               InetAddress[] sources)
+    {
+        this.keyspace = keyspace;
+        this.command = command;
+        this.consistency = consistency;
+        this.sources = sources;
+        this.rowsToFetch = new ArrayList<>(sources.length);
+        this.originalPartitions = new ArrayList<>(sources.length);
+
+        for (InetAddress ignored : sources)
+        {
+            rowsToFetch.add(new TreeMap<>());
+            originalPartitions.add(new ArrayList<>());
+        }
+
+        tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
+    }
+
+    private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, DecoratedKey partitionKey)
+    {
+        return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator));
+    }
+
+    /**
+     * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging
+     * them with the cached original filtered results for that replica.
+     *
+     * @param source the source
+     * @return the protected results for the specified replica
+     */
+    UnfilteredPartitionIterator queryProtectedPartitions(int source)
+    {
+        UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source));
+        SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source);
+
+        if (toFetch.isEmpty())
+            return original;
+
+        // TODO: this would be more efficient if we had multi-key queries internally
+        List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
+                                                           .stream()
+                                                           .map(k -> querySourceOnKey(source, k))
+                                                           .collect(Collectors.toList());
+
+        return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)),
+                                                  command.nowInSec(), null);
+    }
+
+    private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key)
+    {
+        BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
+        assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet()
+
+        InetAddress source = sources[i];
+        NavigableSet<Clustering> clusterings = builder.build();
+        tableMetrics.replicaSideFilteringProtectionRequests.mark();
+        if (logger.isTraceEnabled())
+            logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection",
+                         clusterings, key, source);
+        Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection",
+                      clusterings.size(), key, source);
+
+        // build the read command taking into account that we could be requesting only in the static row
+        DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE;
+        ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed());
+        SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
+                                                                           command.nowInSec(),
+                                                                           command.columnFilter(),
+                                                                           RowFilter.NONE,
+                                                                           limits,
+                                                                           key,
+                                                                           filter);
+        try
+        {
+            return executeReadCommand(cmd, source);
+        }
+        catch (ReadTimeoutException e)
+        {
+            int blockFor = consistency.blockFor(keyspace);
+            throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+        }
+        catch (UnavailableException e)
+        {
+            int blockFor = consistency.blockFor(keyspace);
+            throw new UnavailableException(consistency, blockFor, blockFor - 1);
+        }
+    }
+
+    private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, InetAddress source)
+    {
+        DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1);
+        ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
+        if (StorageProxy.canDoLocalRequest(source))
+            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
+        else
+            MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler);
+
+        // We don't call handler.get() because we want to preserve tombstones
+        handler.awaitResults();
+        assert resolver.responses.size() == 1;
+        return resolver.responses.get(0).payload.makeIterator(command);
+    }
+
+    /**
+     * Returns a merge listener that skips the merged rows for which any of the replicas doesn't have a version,
+     * pessimistically assuming that they are outdated. It is intended to be used during a first merge of per-replica
+     * query results to ensure we fetch enough results from the replicas to ensure we don't miss any potentially
+     * outdated result.
+     * <p>
+     * The listener will track both the accepted data and the primary keys of the rows that are considered as outdated.
+     * That way, once the query results would have been merged using this listener, further calls to
+     * {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the
+     * data originally collected from the specified replica, completed with the potentially outdated rows.
+     */
+    UnfilteredPartitionIterators.MergeListener mergeController()
+    {
+        return (partitionKey, versions) -> {
+
+            PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+
+            for (int i = 0; i < sources.length; i++)
+                builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions));
+
+            return new UnfilteredRowIterators.MergeListener()
+            {
+                @Override
+                public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+                {
+                    // cache the deletion time versions to be able to regenerate the original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].setDeletionTime(versions[i]);
+                }
+
+                @Override
+                public Row onMergedRows(Row merged, Row[] versions)
+                {
+                    // cache the row versions to be able to regenerate the original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].addRow(versions[i]);
+
+                    if (merged.isEmpty())
+                        return merged;
+
+                    boolean isPotentiallyOutdated = false;
+                    boolean isStatic = merged.isStatic();
+                    for (int i = 0; i < versions.length; i++)
+                    {
+                        Row version = versions[i];
+                        if (version == null || (isStatic && version.isEmpty()))
+                        {
+                            isPotentiallyOutdated = true;
+                            BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey);
+                            // Note that for static, we shouldn't add the clustering to the clustering set (the
+                            // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
+                            // we created a builder in the first place will act as a marker that the static row must be
+                            // fetched, even if no other rows are added for this partition.
+                            if (!isStatic)
+                                toFetch.add(merged.clustering());
+                        }
+                    }
+
+                    // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
+                    // an outdated result that is only present because other replica have filtered the up-to-date result
+                    // out), then we skip the row. In other words, the results of the initial merging of results by this
+                    // protection assume the worst case scenario where every row that might be outdated actually is.
+                    // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
+                    // to look at enough data to ultimately fulfill the query limit.
+                    return isPotentiallyOutdated ? null : merged;
+                }
+
+                @Override
+                public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+                {
+                    // cache the marker versions to be able to regenerate the original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].addRangeTombstoneMarker(versions[i]);
+                }
+
+                @Override
+                public void close()
+                {
+                    for (int i = 0; i < sources.length; i++)
+                        originalPartitions.get(i).add(builders[i]);
+                }
+            };
+        };
+    }
+
+    private static PartitionColumns columns(List<UnfilteredRowIterator> versions)
+    {
+        Columns statics = Columns.NONE;
+        Columns regulars = Columns.NONE;
+        for (UnfilteredRowIterator iter : versions)
+        {
+            if (iter == null)
+                continue;
+
+            PartitionColumns cols = iter.columns();
+            statics = statics.mergeTo(cols.statics);
+            regulars = regulars.mergeTo(cols.regulars);
+        }
+        return new PartitionColumns(statics, regulars);
+    }
+
+    private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
+    {
+        EncodingStats stats = EncodingStats.NO_STATS;
+        for (UnfilteredRowIterator iter : iterators)
+        {
+            if (iter == null)
+                continue;
+
+            stats = stats.mergeWith(iter.stats());
+        }
+        return stats;
+    }
+
+    private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> builders)
+    {
+        return new UnfilteredPartitionIterator()
+        {
+            final Iterator<PartitionBuilder> iterator = builders.iterator();
+
+            @Override
+            public boolean isForThrift()
+            {
+                return command.isForThrift();
+            }
+
+            @Override
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
+            @Override
+            public void close()
+            {
+                // nothing to do here
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public UnfilteredRowIterator next()
+            {
+                return iterator.next().build();
+            }
+        };
+    }
+
+    private class PartitionBuilder
+    {
+        private final DecoratedKey partitionKey;
+        private final PartitionColumns columns;
+        private final EncodingStats stats;
+        private DeletionTime deletionTime;
+        private Row staticRow = Rows.EMPTY_STATIC_ROW;
+        private final List<Unfiltered> contents = new ArrayList<>();
+
+        private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns columns, EncodingStats stats)
+        {
+            this.partitionKey = partitionKey;
+            this.columns = columns;
+            this.stats = stats;
+        }
+
+        private void setDeletionTime(DeletionTime deletionTime)
+        {
+            this.deletionTime = deletionTime;
+        }
+
+        private void addRow(Row row)
+        {
+            if (row == null)
+                return;
+
+            if (row.isStatic())
+                staticRow = row;
+            else
+                contents.add(row);
+        }
+
+        private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
+        {
+            if (marker != null)
+                contents.add(marker);
+        }
+
+        private UnfilteredRowIterator build()
+        {
+            return new UnfilteredRowIterator()
+            {
+                final Iterator<Unfiltered> iterator = contents.iterator();
+
+                @Override
+                public DeletionTime partitionLevelDeletion()
+                {
+                    return deletionTime;
+                }
+
+                @Override
+                public EncodingStats stats()
+                {
+                    return stats;
+                }
+
+                @Override
+                public CFMetaData metadata()
+                {
+                    return command.metadata();
+                }
+
+                @Override
+                public boolean isReverseOrder()
+                {
+                    return command.isReversed();
+                }
+
+                @Override
+                public PartitionColumns columns()
+                {
+                    return columns;
+                }
+
+                @Override
+                public DecoratedKey partitionKey()
+                {
+                    return partitionKey;
+                }
+
+                @Override
+                public Row staticRow()
+                {
+                    return staticRow;
+                }
+
+                @Override
+                public void close()
+                {
+                    // nothing to do here
+                }
+
+                @Override
+                public boolean hasNext()
+                {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public Unfiltered next()
+                {
+                    return iterator.next();
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index e80faca..ca9bb09 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -135,4 +136,16 @@ public class Accumulator<E> implements Iterable<E>
             throw new IndexOutOfBoundsException();
         return (E) values[i];
     }
+
+    /**
+     * Removes all of the elements from this accumulator.
+     *
+     * This method is not thread-safe when used concurrently with {@link #add(Object)}.
+     */
+    public void clearUnsafe()
+    {
+        nextIndexUpdater.set(this, 0);
+        presentCountUpdater.set(this, 0);
+        Arrays.fill(values, null);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 2842374..33daca7 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -81,15 +81,7 @@ public class AccumulatorTest
         assertEquals("2", accu.get(1));
         assertEquals("4", accu.get(2));
 
-        try
-        {
-            assertEquals(null, accu.get(3));
-            fail();
-        }
-        catch (IndexOutOfBoundsException e)
-        {
-            // Expected
-        }
+        assertOutOfBonds(accu, 3);
 
         accu.add("0");
 
@@ -103,4 +95,48 @@ public class AccumulatorTest
         assertEquals("0", iter.next());
         assertFalse(iter.hasNext());
     }
+
+    @Test
+    public void testClearUnsafe()
+    {
+        Accumulator<String> accu = new Accumulator<>(3);
+
+        accu.add("1");
+        accu.add("2");
+        accu.add("3");
+
+        accu.clearUnsafe();
+
+        assertEquals(0, accu.size());
+        assertFalse(accu.iterator().hasNext());
+        assertOutOfBonds(accu, 0);
+
+        accu.add("4");
+        accu.add("5");
+
+        assertEquals(2, accu.size());
+
+        assertEquals("4", accu.get(0));
+        assertEquals("5", accu.get(1));
+        assertOutOfBonds(accu, 2);
+
+        Iterator<String> iter = accu.iterator();
+        assertTrue(iter.hasNext());
+        assertEquals("4", iter.next());
+        assertEquals("5", iter.next());
+        assertFalse(iter.hasNext());
+    }
+
+    private static void assertOutOfBonds(Accumulator<String> accumulator, int index)
+    {
+        try
+        {
+            assertNull(accumulator.get(index));
+            fail();
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            // Expected
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org