You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/05 12:16:45 UTC
[cassandra] branch cassandra-3.0 updated: Fix replica-side
filtering returning stale data with CL > 1
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new dd255ff Fix replica-side filtering returning stale data with CL > 1
dd255ff is described below
commit dd255ffa07d0263521a1ca863fc2192db19bc04c
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Wed May 27 11:01:42 2020 +0100
Fix replica-side filtering returning stale data with CL > 1
patch by Andres de la Peña; reviewed by Benjamin Lerer, Caleb Rackliffe and ZhaoYang for CASSANDRA-8272
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/DataRange.java | 10 +
.../cassandra/db/PartitionRangeReadCommand.java | 5 +
src/java/org/apache/cassandra/db/ReadCommand.java | 7 +
.../cassandra/db/SinglePartitionReadCommand.java | 5 +
.../db/compaction/CompactionIterator.java | 3 +-
.../org/apache/cassandra/db/filter/RowFilter.java | 87 ++--
.../partitions/UnfilteredPartitionIterators.java | 30 +-
.../cassandra/db/rows/UnfilteredRowIterators.java | 32 +-
.../org/apache/cassandra/metrics/TableMetrics.java | 18 +-
.../org/apache/cassandra/service/DataResolver.java | 162 +++++--
.../service/ReplicaFilteringProtection.java | 465 +++++++++++++++++++++
.../cassandra/utils/concurrent/Accumulator.java | 13 +
.../utils/concurrent/AccumulatorTest.java | 54 ++-
14 files changed, 800 insertions(+), 92 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 46b3f56..ff00579 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.21
+ * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
* Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
* Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
* EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index d2f9c76..f6776c4 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -196,6 +196,16 @@ public class DataRange
}
/**
+ * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+ *
+ * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+ */
+ public boolean isReversed()
+ {
+ return clusteringIndexFilter.isReversed();
+ }
+
+ /**
* The clustering index filter to use for the provided key.
* <p>
* This may or may not be the same filter for all keys (that is, paging range
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 4f936cc..1da66c1 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand
return DatabaseDescriptor.getRangeRpcTimeout();
}
+ public boolean isReversed()
+ {
+ return dataRange.isReversed();
+ }
+
public boolean selectsKey(DecoratedKey key)
{
if (!dataRange().contains(key))
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index b499daf..39a5402 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery
protected abstract int oldestUnrepairedTombstone();
+ /**
+ * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+ *
+ * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+ */
+ public abstract boolean isReversed();
+
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 2e014ba..841c3b9 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand
return DatabaseDescriptor.getReadRpcTimeout();
}
+ public boolean isReversed()
+ {
+ return clusteringIndexFilter.isReversed();
+ }
+
public boolean selectsKey(DecoratedKey key)
{
if (!this.partitionKey().equals(key))
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b132d90..8c4732b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -199,11 +199,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
{
}
- public void onMergedRows(Row merged, Row[] versions)
+ public Row onMergedRows(Row merged, Row[] versions)
{
indexTransaction.start();
indexTransaction.onRowMerge(merged, versions);
indexTransaction.commit();
+ return merged;
}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4300651..774e4d3 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -126,6 +126,8 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return false;
}
+ protected abstract Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec);
+
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
@@ -134,7 +136,23 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
* @param nowInSec the time of query in seconds.
* @return the filtered iterator.
*/
- public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec);
+ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
+ {
+ return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec));
+ }
+
+ /**
+ * Filters the provided iterator so that only the row satisfying the expression of this filter
+ * are included in the resulting iterator.
+ *
+ * @param iter the iterator to filter
+ * @param nowInSec the time of query in seconds.
+ * @return the filtered iterator.
+ */
+ public PartitionIterator filter(PartitionIterator iter, CFMetaData metadata, int nowInSec)
+ {
+ return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec));
+ }
/**
* Whether the provided row in the provided partition satisfies this filter.
@@ -263,20 +281,16 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions);
}
- public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
+ protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
{
- if (expressions.isEmpty())
- return iter;
-
- final CFMetaData metadata = iter.metadata();
long numberOfStaticColumnExpressions = expressions.stream().filter(e -> e.column.isStatic()).count();
final boolean filterStaticColumns = numberOfStaticColumnExpressions != 0;
final boolean filterNonStaticColumns = (expressions.size() - numberOfStaticColumnExpressions) > 0;
- class IsSatisfiedFilter extends Transformation<UnfilteredRowIterator>
+ return new Transformation<BaseRowIterator<?>>()
{
DecoratedKey pk;
- public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{
// The filter might be on static columns, so need to check static row first.
if (filterStaticColumns && applyToRow(partition.staticRow()) == null)
@@ -286,7 +300,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
}
pk = partition.partitionKey();
- UnfilteredRowIterator iterator = Transformation.apply(partition, this);
+ BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator
+ ? Transformation.apply((UnfilteredRowIterator) partition, this)
+ : Transformation.apply((RowIterator) partition, this);
if (filterNonStaticColumns && !iterator.hasNext())
{
@@ -308,9 +324,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
return null;
return row;
}
- }
-
- return Transformation.apply(iter, new IsSatisfiedFilter());
+ };
}
protected RowFilter withNewExpressions(List<Expression> expressions)
@@ -326,36 +340,47 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions);
}
- public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+ protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
{
- if (expressions.isEmpty())
- return iter;
-
- class IsSatisfiedThriftFilter extends Transformation<UnfilteredRowIterator>
+ // Thrift does not filter rows, it filters entire partition if any of the expression is not
+ // satisfied, which forces us to materialize the result (in theory we could materialize only
+ // what we need which might or might not be everything, but we keep it simple since in practice
+ // it's not worth that it has ever been).
+ return new Transformation<BaseRowIterator<?>>()
{
- @Override
- public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+ protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
+ {
+ return partition instanceof UnfilteredRowIterator ? applyTo((UnfilteredRowIterator) partition)
+ : applyTo((RowIterator) partition);
+ }
+
+ private UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
+ {
+ ImmutableBTreePartition result = ImmutableBTreePartition.create(partition);
+ partition.close();
+ return accepts(result) ? result.unfilteredIterator() : null;
+ }
+
+ private RowIterator applyTo(RowIterator partition)
{
- // Thrift does not filter rows, it filters entire partition if any of the expression is not
- // satisfied, which forces us to materialize the result (in theory we could materialize only
- // what we need which might or might not be everything, but we keep it simple since in practice
- // it's not worth that it has ever been).
- ImmutableBTreePartition result = ImmutableBTreePartition.create(iter);
- iter.close();
+ FilteredPartition result = FilteredPartition.create(partition);
+ return accepts(result) ? result.rowIterator() : null;
+ }
+ private boolean accepts(ImmutableBTreePartition result)
+ {
// The partition needs to have a row for every expression, and the expression needs to be valid.
for (Expression expr : expressions)
{
assert expr instanceof ThriftExpression;
- Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
- if (row == null || !expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row))
- return null;
+ Row row = result.getRow(makeCompactClustering(metadata, expr.column().name.bytes));
+ if (row == null || !expr.isSatisfiedBy(metadata, result.partitionKey(), row))
+ return false;
}
// If we get there, it means all expressions where satisfied, so return the original result
- return result.unfilteredIterator();
+ return true;
}
- }
- return Transformation.apply(iter, new IsSatisfiedThriftFilter());
+ };
}
protected RowFilter withNewExpressions(List<Expression> expressions)
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 3f7d072..bff910e 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators
public interface MergeListener
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
- public void close();
+ public default void close() {}
}
@SuppressWarnings("resource") // The created resources are returned right away
@@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
+ public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
+ {
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ class Extend implements MorePartitions<UnfilteredPartitionIterator>
+ {
+ int i = 1;
+ public UnfilteredPartitionIterator moreContents()
+ {
+ if (i >= iterators.size())
+ return null;
+ return iterators.get(i++);
+ }
+ }
+ return MorePartitions.extend(iterators.get(0), new Extend());
+ }
+
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
@@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators
public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener)
{
- assert listener != null;
assert !iterators.isEmpty();
final boolean isForThrift = iterators.get(0).isForThrift();
@@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators
protected UnfilteredRowIterator getReduced()
{
- UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge);
+ UnfilteredRowIterators.MergeListener rowListener = listener == null
+ ? null
+ : listener.getRowMergeListener(partitionKey, toMerge);
// Replace nulls by empty iterators
for (int i = 0; i < toMerge.size(); i++)
@@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators
public void close()
{
merged.close();
- listener.close();
+
+ if (listener != null)
+ listener.close();
}
};
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index f42f675..b6dbf82 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators
* particular, this may be called in cases where there is no row in the merged output (if a source has a row
* that is shadowed by another source range tombstone or partition level deletion).
*
- * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
- * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
+ * @param merged the result of the merge. This cannot be {@code null} (so that listener can always access the
+ * clustering from this safely)but can be empty, in which case this is a placeholder for when at least one
+ * source has a row, but that row is shadowed in the merged output.
* @param versions for each source, the row in that source corresponding to {@code merged}. This can be
* {@code null} for some sources if the source has not such row.
+ * @return the row to use as result of the merge (can be {@code null}). Most implementations should simply
+ * return {@code merged}, but this allows some implementations to impact the merge result if necessary. If this
+ * returns either {@code null} or an empty row, then the row is skipped from the merge result. If this returns a
+ * non {@code null} result, then the returned row <b>must</b> have the same clustering than {@code merged}.
*/
- public void onMergedRows(Row merged, Row[] versions);
+ public Row onMergedRows(Row merged, Row[] versions);
/**
* Called once for every range tombstone marker participating in the merge.
@@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators
Row merged = merger.merge(partitionDeletion);
if (merged == null)
merged = Rows.EMPTY_STATIC_ROW;
- if (listener != null)
- listener.onMergedRows(merged, merger.mergedRows());
- return merged;
+ if (listener == null)
+ return merged;
+
+ merged = listener.onMergedRows(merged, merger.mergedRows());
+ // Note that onMergedRows can have returned null even though his input wasn't null
+ return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
}
private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
@@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators
if (nextKind == Unfiltered.Kind.ROW)
{
Row merged = rowMerger.merge(markerMerger.activeDeletion());
- if (listener != null)
- listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows());
- return merged;
+ if (listener == null)
+ return merged;
+
+ merged = listener.onMergedRows(merged == null
+ ? BTreeRow.emptyRow(rowMerger.mergedClustering())
+ : merged,
+ rowMerger.mergedRows());
+
+ return merged == null || merged.isEmpty() ? null : merged;
}
else
{
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index eb56ed9..1f4803e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -153,6 +153,7 @@ public class TableMetrics
public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
+ public final Meter replicaSideFilteringProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
@@ -649,8 +650,9 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
- readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
- shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+ readRepairRequests = createTableMeter("ReadRepairRequests");
+ shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests");
+ replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests");
}
public void updateSSTableIterated(int count)
@@ -758,6 +760,18 @@ public class TableMetrics
return cfCounter;
}
+ private Meter createTableMeter(final String name)
+ {
+ return createTableMeter(name, name);
+ }
+
+ private Meter createTableMeter(final String name, final String alias)
+ {
+ Meter tableMeter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+ register(name, alias, tableMeter);
+ return tableMeter;
+ }
+
/**
* Create a histogram-like interface that will register both a CF, keyspace and global level
* histogram and forward any updates to both
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 522c57b..02d355e 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver
public PartitionIterator resolve()
{
+ if (!needsReplicaFilteringProtection())
+ {
+ ResolveContext context = new ResolveContext(responses.size());
+ return resolveWithReadRepair(context,
+ i -> shortReadProtectedResponse(i, context),
+ UnaryOperator.identity());
+ }
+
+ return resolveWithReplicaFilteringProtection();
+ }
+
+ private boolean needsReplicaFilteringProtection()
+ {
+ return !command.rowFilter().isEmpty();
+ }
+
+ private class ResolveContext
+ {
+ private final InetAddress[] sources;
+ private final DataLimits.Counter mergedResultCounter;
+
+ private ResolveContext(int count)
+ {
+ assert count <= responses.size();
+ this.sources = new InetAddress[count];
+ for (int i = 0; i < count; i++)
+ sources[i] = responses.get(i).from;
+ this.mergedResultCounter = command.limits().newCounter(command.nowInSec(),
+ true,
+ command.selectsFullPartition(),
+ enforceStrictLiveness);
+ }
+
+ private boolean needShortReadProtection()
+ {
+ // If we have only one result, there is no read repair to do and we can't get short reads
+ // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a
+ // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit,
+ // don't bother protecting against short reads.
+ return sources.length > 1 && !command.limits().isUnlimited();
+ }
+ }
+
+ @FunctionalInterface
+ private interface ResponseProvider
+ {
+ UnfilteredPartitionIterator getResponse(int i);
+ }
+
+ private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context)
+ {
+ UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command);
+
+ return context.needShortReadProtection()
+ ? extendWithShortReadProtection(originalResponse, context.sources[i], context.mergedResultCounter)
+ : originalResponse;
+ }
+
+ private PartitionIterator resolveWithReadRepair(ResolveContext context,
+ ResponseProvider responseProvider,
+ UnaryOperator<PartitionIterator> preCountFilter)
+ {
+ return resolveInternal(context, new RepairMergeListener(context.sources), responseProvider, preCountFilter);
+ }
+
+ private PartitionIterator resolveWithReplicaFilteringProtection()
+ {
+ // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that
+ // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version
+ // of that row) works in 3 steps:
+ // 1) we read the full response just to collect rows that may be outdated (the ones we got from some
+ // replica but didn't got any response for other; it could be those other replica have filtered a more
+ // up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the
+ // query limit. This simulate the worst case scenario where all those "potentially outdated" rows are
+ // indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read
+ // protection).
+ // 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated
+ // or not.
+ // 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair,
+ // but where for each replica we use their original response _plus_ the additional rows queried in the
+ // previous step (and apply the command#rowFilter() on the full result). Since the first phase has
+ // pessimistically collected enough results for the case where all potentially outdated results are indeed
+ // outdated, we shouldn't need further short-read protection requests during this phase.
+
// We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once and use that through the method.
int count = responses.size();
- List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
- InetAddress[] sources = new InetAddress[count];
+ // We need separate contexts, as each context has his own counter
+ ResolveContext firstPhaseContext = new ResolveContext(count);
+ ResolveContext secondPhaseContext = new ResolveContext(count);
+ ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, firstPhaseContext.sources);
+ PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
+ rfp.mergeController(),
+ i -> shortReadProtectedResponse(i, firstPhaseContext),
+ UnaryOperator.identity());
+
+ // Consume the first phase partitions to populate the replica filtering protection with both those materialized
+ // partitions and the primary keys to be fetched.
+ PartitionIterators.consume(firstPhasePartitions);
+ firstPhasePartitions.close();
+
+ // After reading the entire query results the protection helper should have cached all the partitions so we can
+ // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if
+ // it needs to query replicas.
+ responses.clearUnsafe();
+
+ return resolveWithReadRepair(secondPhaseContext,
+ rfp::queryProtectedPartitions,
+ results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+ }
+
+ private PartitionIterator resolveInternal(ResolveContext context,
+ UnfilteredPartitionIterators.MergeListener mergeListener,
+ ResponseProvider responseProvider,
+ UnaryOperator<PartitionIterator> preCountFilter)
+ {
+ int count = context.sources.length;
+ List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
for (int i = 0; i < count; i++)
- {
- MessageIn<ReadResponse> msg = responses.get(i);
- iters.add(msg.payload.makeIterator(command));
- sources[i] = msg.from;
- }
+ results.add(responseProvider.getResponse(i));
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
@@ -106,36 +216,14 @@ public class DataResolver extends ResponseResolver
* See CASSANDRA-13747 for more details.
*/
- DataLimits.Counter mergedResultCounter =
- command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
-
- UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+ UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener);
FilteredPartitions filtered =
- FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
- PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
+ FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+ PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter);
return command.isForThrift()
- ? counted
- : Transformation.apply(counted, new EmptyPartitionsDiscarder());
- }
-
- private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
- InetAddress[] sources,
- DataLimits.Counter mergedResultCounter)
- {
- // If we have only one results, there is no read repair to do and we can't get short reads
- if (results.size() == 1)
- return results.get(0);
-
- /*
- * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
- * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
- */
- if (!command.limits().isUnlimited())
- for (int i = 0; i < results.size(); i++)
- results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter));
-
- return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
+ ? counted
+ : Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -294,13 +382,13 @@ public class DataResolver extends ResponseResolver
}
}
- public void onMergedRows(Row merged, Row[] versions)
+ public Row onMergedRows(Row merged, Row[] versions)
{
// If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
// those case directly in their respective methods (in other words, it would be inefficient to send a row
// deletion as repair when we know we've already send a partition level or range tombstone that covers it).
if (merged.isEmpty())
- return;
+ return merged;
Rows.diff(diffListener, merged, versions);
for (int i = 0; i < currentRows.length; i++)
@@ -309,6 +397,8 @@ public class DataResolver extends ResponseResolver
update(i).add(currentRows[i].build());
}
Arrays.fill(currentRows, null);
+
+ return merged;
}
private DeletionTime currentDeletion()
diff --git a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
new file mode 100644
index 0000000..36d51cc
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+/**
+ * Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results
+ * being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}).
+ * <p>
+ * When using replica-side filtering with CL>ONE, a replica can send a stale result satisfying the filter, while updated
+ * replicas won't send a corresponding tombstone to discard that result during reconciliation. This helper identifies
+ * the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by
+ * primary key to the "silent" replicas in a second fetch round.
+ * <p>
+ * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
+ */
+class ReplicaFilteringProtection
+{
+ private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class);
+
+ private final Keyspace keyspace;
+ private final ReadCommand command;
+ private final ConsistencyLevel consistency;
+ private final InetAddress[] sources;
+ private final TableMetrics tableMetrics;
+
+ /**
+ * Per-source primary keys of the rows that might be outdated so they need to be fetched.
+ * For outdated static rows we use an empty builder to signal it has to be queried.
+ */
+ private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch;
+
+ /**
+ * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows.
+ */
+ private final List<List<PartitionBuilder>> originalPartitions;
+
+ ReplicaFilteringProtection(Keyspace keyspace,
+ ReadCommand command,
+ ConsistencyLevel consistency,
+ InetAddress[] sources)
+ {
+ this.keyspace = keyspace;
+ this.command = command;
+ this.consistency = consistency;
+ this.sources = sources;
+ this.rowsToFetch = new ArrayList<>(sources.length);
+ this.originalPartitions = new ArrayList<>(sources.length);
+
+ for (InetAddress ignored : sources)
+ {
+ rowsToFetch.add(new TreeMap<>());
+ originalPartitions.add(new ArrayList<>());
+ }
+
+ tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
+ }
+
+ private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, DecoratedKey partitionKey)
+ {
+ return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator));
+ }
+
+ /**
+ * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging
+ * them with the cached original filtered results for that replica.
+ *
+ * @param source the source
+ * @return the protected results for the specified replica
+ */
+ UnfilteredPartitionIterator queryProtectedPartitions(int source)
+ {
+ UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source));
+ SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source);
+
+ if (toFetch.isEmpty())
+ return original;
+
+ // TODO: this would be more efficient if we had multi-key queries internally
+ List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
+ .stream()
+ .map(k -> querySourceOnKey(source, k))
+ .collect(Collectors.toList());
+
+ return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)),
+ command.nowInSec(), null);
+ }
+
+ private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key)
+ {
+ BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
+ assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet()
+
+ InetAddress source = sources[i];
+ NavigableSet<Clustering> clusterings = builder.build();
+ tableMetrics.replicaSideFilteringProtectionRequests.mark();
+ if (logger.isTraceEnabled())
+ logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection",
+ clusterings, key, source);
+ Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection",
+ clusterings.size(), key, source);
+
+ // build the read command taking into account that we could be requesting only in the static row
+ DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE;
+ ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed());
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ RowFilter.NONE,
+ limits,
+ key,
+ filter);
+ try
+ {
+ return executeReadCommand(cmd, source);
+ }
+ catch (ReadTimeoutException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ }
+ catch (UnavailableException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new UnavailableException(consistency, blockFor, blockFor - 1);
+ }
+ }
+
+ private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, InetAddress source)
+ {
+ DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1);
+ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
+ if (StorageProxy.canDoLocalRequest(source))
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));
+ else
+ MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler);
+
+ // We don't call handler.get() because we want to preserve tombstones
+ handler.awaitResults();
+ assert resolver.responses.size() == 1;
+ return resolver.responses.get(0).payload.makeIterator(command);
+ }
+
+ /**
+ * Returns a merge listener that skips the merged rows for which any of the replicas doesn't have a version,
+ * pessimistically assuming that they are outdated. It is intended to be used during a first merge of per-replica
+ * query results to ensure we fetch enough results from the replicas to ensure we don't miss any potentially
+ * outdated result.
+ * <p>
+ * The listener will track both the accepted data and the primary keys of the rows that are considered as outdated.
+ * That way, once the query results would have been merged using this listener, further calls to
+ * {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the
+ * data originally collected from the specified replica, completed with the potentially outdated rows.
+ */
+ UnfilteredPartitionIterators.MergeListener mergeController()
+ {
+ return (partitionKey, versions) -> {
+
+ PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+
+ for (int i = 0; i < sources.length; i++)
+ builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions));
+
+ return new UnfilteredRowIterators.MergeListener()
+ {
+ @Override
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ // cache the deletion time versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].setDeletionTime(versions[i]);
+ }
+
+ @Override
+ public Row onMergedRows(Row merged, Row[] versions)
+ {
+ // cache the row versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRow(versions[i]);
+
+ if (merged.isEmpty())
+ return merged;
+
+ boolean isPotentiallyOutdated = false;
+ boolean isStatic = merged.isStatic();
+ for (int i = 0; i < versions.length; i++)
+ {
+ Row version = versions[i];
+ if (version == null || (isStatic && version.isEmpty()))
+ {
+ isPotentiallyOutdated = true;
+ BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey);
+ // Note that for static, we shouldn't add the clustering to the clustering set (the
+ // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
+ // we created a builder in the first place will act as a marker that the static row must be
+ // fetched, even if no other rows are added for this partition.
+ if (!isStatic)
+ toFetch.add(merged.clustering());
+ }
+ }
+
+ // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
+ // an outdated result that is only present because other replica have filtered the up-to-date result
+ // out), then we skip the row. In other words, the results of the initial merging of results by this
+ // protection assume the worst case scenario where every row that might be outdated actually is.
+ // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
+ // to look at enough data to ultimately fulfill the query limit.
+ return isPotentiallyOutdated ? null : merged;
+ }
+
+ @Override
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ // cache the marker versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRangeTombstoneMarker(versions[i]);
+ }
+
+ @Override
+ public void close()
+ {
+ for (int i = 0; i < sources.length; i++)
+ originalPartitions.get(i).add(builders[i]);
+ }
+ };
+ };
+ }
+
+ private static PartitionColumns columns(List<UnfilteredRowIterator> versions)
+ {
+ Columns statics = Columns.NONE;
+ Columns regulars = Columns.NONE;
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter == null)
+ continue;
+
+ PartitionColumns cols = iter.columns();
+ statics = statics.mergeTo(cols.statics);
+ regulars = regulars.mergeTo(cols.regulars);
+ }
+ return new PartitionColumns(statics, regulars);
+ }
+
+ private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
+ {
+ EncodingStats stats = EncodingStats.NO_STATS;
+ for (UnfilteredRowIterator iter : iterators)
+ {
+ if (iter == null)
+ continue;
+
+ stats = stats.mergeWith(iter.stats());
+ }
+ return stats;
+ }
+
+ private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> builders)
+ {
+ return new UnfilteredPartitionIterator()
+ {
+ final Iterator<PartitionBuilder> iterator = builders.iterator();
+
+ @Override
+ public boolean isForThrift()
+ {
+ return command.isForThrift();
+ }
+
+ @Override
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to do here
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public UnfilteredRowIterator next()
+ {
+ return iterator.next().build();
+ }
+ };
+ }
+
+ private class PartitionBuilder
+ {
+ private final DecoratedKey partitionKey;
+ private final PartitionColumns columns;
+ private final EncodingStats stats;
+ private DeletionTime deletionTime;
+ private Row staticRow = Rows.EMPTY_STATIC_ROW;
+ private final List<Unfiltered> contents = new ArrayList<>();
+
+ private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns columns, EncodingStats stats)
+ {
+ this.partitionKey = partitionKey;
+ this.columns = columns;
+ this.stats = stats;
+ }
+
+ private void setDeletionTime(DeletionTime deletionTime)
+ {
+ this.deletionTime = deletionTime;
+ }
+
+ private void addRow(Row row)
+ {
+ if (row == null)
+ return;
+
+ if (row.isStatic())
+ staticRow = row;
+ else
+ contents.add(row);
+ }
+
+ private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ if (marker != null)
+ contents.add(marker);
+ }
+
+ private UnfilteredRowIterator build()
+ {
+ return new UnfilteredRowIterator()
+ {
+ final Iterator<Unfiltered> iterator = contents.iterator();
+
+ @Override
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionTime;
+ }
+
+ @Override
+ public EncodingStats stats()
+ {
+ return stats;
+ }
+
+ @Override
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
+ @Override
+ public boolean isReverseOrder()
+ {
+ return command.isReversed();
+ }
+
+ @Override
+ public PartitionColumns columns()
+ {
+ return columns;
+ }
+
+ @Override
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ @Override
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to do here
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ return iterator.next();
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index e80faca..ca9bb09 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.utils.concurrent;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -135,4 +136,16 @@ public class Accumulator<E> implements Iterable<E>
throw new IndexOutOfBoundsException();
return (E) values[i];
}
+
+ /**
+ * Removes all of the elements from this accumulator.
+ *
+ * This method is not thread-safe when used concurrently with {@link #add(Object)}.
+ */
+ public void clearUnsafe()
+ {
+ nextIndexUpdater.set(this, 0);
+ presentCountUpdater.set(this, 0);
+ Arrays.fill(values, null);
+ }
}
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 2842374..33daca7 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -81,15 +81,7 @@ public class AccumulatorTest
assertEquals("2", accu.get(1));
assertEquals("4", accu.get(2));
- try
- {
- assertEquals(null, accu.get(3));
- fail();
- }
- catch (IndexOutOfBoundsException e)
- {
- // Expected
- }
+ assertOutOfBonds(accu, 3);
accu.add("0");
@@ -103,4 +95,48 @@ public class AccumulatorTest
assertEquals("0", iter.next());
assertFalse(iter.hasNext());
}
+
+ @Test
+ public void testClearUnsafe()
+ {
+ Accumulator<String> accu = new Accumulator<>(3);
+
+ accu.add("1");
+ accu.add("2");
+ accu.add("3");
+
+ accu.clearUnsafe();
+
+ assertEquals(0, accu.size());
+ assertFalse(accu.iterator().hasNext());
+ assertOutOfBonds(accu, 0);
+
+ accu.add("4");
+ accu.add("5");
+
+ assertEquals(2, accu.size());
+
+ assertEquals("4", accu.get(0));
+ assertEquals("5", accu.get(1));
+ assertOutOfBonds(accu, 2);
+
+ Iterator<String> iter = accu.iterator();
+ assertTrue(iter.hasNext());
+ assertEquals("4", iter.next());
+ assertEquals("5", iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ private static void assertOutOfBonds(Accumulator<String> accumulator, int index)
+ {
+ try
+ {
+ assertNull(accumulator.get(index));
+ fail();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // Expected
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org