You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/06/20 14:51:05 UTC
cassandra git commit: Always close RT markers returned by
ReadCommand#executeLocally()
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 eb91942f6 -> 4e23c9e4d
Always close RT markers returned by ReadCommand#executeLocally()
patch by Aleksey Yeschenko; reviewed by Blake Eggleston and
Sam Tunnicliffe for CASSANDRA-14515
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e23c9e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e23c9e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e23c9e4
Branch: refs/heads/cassandra-3.0
Commit: 4e23c9e4dba6ee772531d82980f73234bd41869a
Parents: eb91942
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Wed Jun 20 00:01:10 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Wed Jun 20 15:45:54 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ReadCommand.java | 42 +-
.../db/rows/UnfilteredRowIterators.java | 26 +
.../cassandra/db/transform/RTBoundCloser.java | 110 +++++
.../db/transform/RTBoundValidator.java | 106 ++++
.../apache/cassandra/service/DataResolver.java | 21 +-
.../db/transform/RTTransformationsTest.java | 482 +++++++++++++++++++
7 files changed, 758 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebf8764..aeeb0ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
* Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
* Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c93692a..f8a0795 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.RTBoundCloser;
+import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.Index;
@@ -329,6 +331,10 @@ public abstract class ReadCommand implements ReadQuery
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{
+ // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
+ // ends equal, and there are no dangling RT bound in any partition.
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+
return isDigestQuery()
? ReadResponse.createDigestResponse(iterator, this)
: ReadResponse.createDataResponse(iterator, this);
@@ -401,29 +407,37 @@ public abstract class ReadCommand implements ReadQuery
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
}
- UnfilteredPartitionIterator resultIterator = searcher == null
- ? queryStorage(cfs, orderGroup)
- : searcher.search(orderGroup);
+ UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup);
try
{
- resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
+ iterator = withoutPurgeableTombstones(iterator, cfs);
+ iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
// no point in checking it again.
- RowFilter updatedFilter = searcher == null
- ? rowFilter()
- : index.getPostIndexQueryFilter(rowFilter());
-
- // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
- // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
- // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
- // processing we do on it).
- return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+ RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
+
+ /*
+ * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+ * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+ * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+ * processing we do on it).
+ */
+ iterator = filter.filter(iterator, nowInSec());
+
+ // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
+ // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
+ iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+
+ // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
+ iterator = Transformation.apply(iterator, new RTBoundCloser());
+
+ return iterator;
}
catch (RuntimeException | Error e)
{
- resultIterator.close();
+ iterator.close();
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 5c27363..f42f675 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -140,6 +140,32 @@ public abstract class UnfilteredRowIterators
return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion);
}
+ public static UnfilteredRowIterator singleton(Unfiltered unfiltered,
+ CFMetaData metadata,
+ DecoratedKey partitionKey,
+ DeletionTime partitionLevelDeletion,
+ PartitionColumns columns,
+ Row staticRow,
+ boolean isReverseOrder,
+ EncodingStats encodingStats)
+ {
+ return new AbstractUnfilteredRowIterator(metadata, partitionKey, partitionLevelDeletion, columns, staticRow, isReverseOrder, encodingStats)
+ {
+ boolean isDone = false;
+
+ protected Unfiltered computeNext()
+ {
+ if (!isDone)
+ {
+ isDone = true;
+ return unfiltered;
+ }
+
+ return endOfData();
+ }
+ };
+ }
+
/**
* Digests the partition represented by the provided iterator.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
new file mode 100644
index 0000000..11f0344
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * A transformation that appends an RT bound marker to row iterators in case they don't have one.
+ *
+ * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadOrderGroup)},
+ * if {@link org.apache.cassandra.db.filter.DataLimits} stopped the iterator on a live row that was enclosed in an
+ * older RT.
+ *
+ * If we don't do this, and send a response without the closing bound, we can break read/short read protection read
+ * isolation, and potentially cause data loss.
+ *
+ * See CASSANDRA-14515 for context.
+ */
+public final class RTBoundCloser extends Transformation<UnfilteredRowIterator>
+{
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ RowsTransformation transformation = new RowsTransformation(partition);
+ return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation);
+ }
+
+ private final static class RowsTransformation extends Transformation implements MoreRows<UnfilteredRowIterator>
+ {
+ private final UnfilteredRowIterator partition;
+
+ private Clustering lastRowClustering;
+ private DeletionTime openMarkerDeletionTime;
+
+ private RowsTransformation(UnfilteredRowIterator partition)
+ {
+ this.partition = partition;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ lastRowClustering = row.clustering();
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ openMarkerDeletionTime =
+ marker.isOpen(partition.isReverseOrder()) ? marker.openDeletionTime(partition.isReverseOrder()) : null;
+ lastRowClustering = null;
+ return marker;
+ }
+
+ @Override
+ public UnfilteredRowIterator moreContents()
+ {
+ // there is no open RT in the stream - nothing for us to do
+ if (null == openMarkerDeletionTime)
+ return null;
+
+ /*
+ * there *is* an open RT in the stream, but there have been no rows after the opening bound - this must
+ * never happen in scenarios where RTBoundCloser is meant to be used; the last encountered clustering
+ * should be either a closing bound marker - if the iterator was exhausted fully - or a live row - if
+ * DataLimits stopped it short in the middle of an RT.
+ */
+ if (null == lastRowClustering)
+ {
+ CFMetaData metadata = partition.metadata();
+ String message =
+ String.format("UnfilteredRowIterator for %s.%s has an open RT bound as its last item", metadata.ksName, metadata.cfName);
+ throw new IllegalStateException(message);
+ }
+
+ // create an artificial inclusive closing RT bound with bound matching last seen row's clustering
+ RangeTombstoneBoundMarker closingBound =
+ RangeTombstoneBoundMarker.inclusiveClose(partition.isReverseOrder(), lastRowClustering.getRawValues(), openMarkerDeletionTime);
+
+ return UnfilteredRowIterators.singleton(closingBound,
+ partition.metadata(),
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ partition.columns(),
+ partition.staticRow(),
+ partition.isReverseOrder(),
+ partition.stats());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
new file mode 100644
index 0000000..7866b14
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+/**
+ * A validating transformation that sanity-checks the sequence of RT bounds and boundaries in every partition.
+ *
+ * What we validate, specifically:
+ * - that open markers are only followed by close markers
+ * - that open markers and close markers have equal deletion times
+ * - optionally, that the iterator closes its last RT marker
+ */
+public final class RTBoundValidator extends Transformation<UnfilteredRowIterator>
+{
+ private final boolean enforceIsClosed;
+
+ public RTBoundValidator(boolean enforceIsClosed)
+ {
+ this.enforceIsClosed = enforceIsClosed;
+ }
+
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return Transformation.apply(partition, new RowsTransformation(partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+ }
+
+ private final static class RowsTransformation extends Transformation
+ {
+ private final CFMetaData metadata;
+ private final boolean isReverseOrder;
+ private final boolean enforceIsClosed;
+
+ private DeletionTime openMarkerDeletionTime;
+
+ private RowsTransformation(CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed)
+ {
+ this.metadata = metadata;
+ this.isReverseOrder = isReverseOrder;
+ this.enforceIsClosed = enforceIsClosed;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ if (null == openMarkerDeletionTime)
+ {
+ // there is no open RT in the stream - we are expecting a *_START_BOUND
+ if (marker.isClose(isReverseOrder))
+ throw ise("unexpected end bound or boundary " + marker.toString(metadata));
+ }
+ else
+ {
+ // there is an open RT in the stream - we are expecting a *_BOUNDARY or an *_END_BOUND
+ if (!marker.isClose(isReverseOrder))
+ throw ise("start bound followed by another start bound " + marker.toString(metadata));
+
+ // deletion times of open/close markers must match
+ DeletionTime deletionTime = marker.closeDeletionTime(isReverseOrder);
+ if (!deletionTime.equals(openMarkerDeletionTime))
+ throw ise("open marker and close marker have different deletion times");
+
+ openMarkerDeletionTime = null;
+ }
+
+ if (marker.isOpen(isReverseOrder))
+ openMarkerDeletionTime = marker.openDeletionTime(isReverseOrder);
+
+ return marker;
+ }
+
+ @Override
+ public void onPartitionClose()
+ {
+ if (enforceIsClosed && null != openMarkerDeletionTime)
+ throw ise("expected all RTs to be closed, but the last one is open");
+ }
+
+ private IllegalStateException ise(String why)
+ {
+ String message = String.format("UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s",
+ metadata.ksName, metadata.cfName, why);
+ throw new IllegalStateException(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 2252913..522c57b 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -398,23 +398,12 @@ public class DataResolver extends ResponseResolver
if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
{
/*
- * Since there is an ongoing merged deletion, the only two ways we don't have an open repair for
- * this source are that:
- *
- * 1) it had a range open with the same deletion as current marker, and the marker is coming from
- * a short read protection response - repeating the open RT bound, or
- * 2) it had a range open with the same deletion as current marker, and the marker is closing it.
+ * Since there is an ongoing merged deletion, the only way we don't have an open repair for
+ * this source is that it had a range open with the same deletion as current marker,
+ * and the marker is closing it.
*/
- if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1)
- {
- assert currentDeletion.equals(marker.openDeletionTime(isReversed))
- : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
- }
- else // (2)
- {
- assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
- : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
- }
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
// and so unless it's a boundary whose opening deletion time is still equal to the current
// deletion (see comment above for why this can actually happen), we have to repair the source
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
new file mode 100644
index 0000000..832c5a3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ClusteringPrefix.Kind;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class RTTransformationsTest
+{
+ private static final String KEYSPACE = "RTBoundCloserTest";
+ private static final String TABLE = "table";
+
+ private final int nowInSec = FBUtilities.nowInSeconds();
+
+ private CFMetaData metadata;
+ private DecoratedKey key;
+
+ @Before
+ public void setUp()
+ {
+ metadata =
+ CFMetaData.Builder
+ .create(KEYSPACE, TABLE)
+ .addPartitionKey("pk", UTF8Type.instance)
+ .addClusteringColumn("ck0", UTF8Type.instance)
+ .addClusteringColumn("ck1", UTF8Type.instance)
+ .addClusteringColumn("ck2", UTF8Type.instance)
+ .build();
+ key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+ }
+
+ @Test
+ public void testAddsNothingWhenAlreadyClosed()
+ {
+ UnfilteredPartitionIterator original = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ );
+
+ UnfilteredPartitionIterator extended = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ );
+ extended = Transformation.apply(extended, new RTBoundCloser());
+ assertIteratorsEqual(original, extended);
+ }
+
+ @Test
+ public void testAddsNothingWhenAlreadyClosedInReverseOrder()
+ {
+ UnfilteredPartitionIterator original = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+
+ UnfilteredPartitionIterator extended = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+ extended = Transformation.apply(extended, new RTBoundCloser());
+ assertIteratorsEqual(original, extended);
+ }
+
+ @Test
+ public void testClosesUnclosedBound()
+ {
+ UnfilteredPartitionIterator original = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ );
+ UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+ UnfilteredPartitionIterator expected = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1", "")
+ );
+ assertIteratorsEqual(expected, extended);
+ }
+
+ @Test
+ public void testClosesUnclosedBoundary()
+ {
+ UnfilteredPartitionIterator original = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a")
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+ , row(2, "a", "1", "")
+ );
+ UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+ UnfilteredPartitionIterator expected = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a")
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+ , row(2, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 1, "a", "1", "")
+ );
+ assertIteratorsEqual(expected, extended);
+ }
+
+ @Test
+ public void testClosesUnclosedBoundInReverseOrder()
+ {
+ UnfilteredPartitionIterator original = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ );
+ UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+ UnfilteredPartitionIterator expected = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1", "")
+ );
+ assertIteratorsEqual(expected, extended);
+ }
+
+ @Test
+ public void testClosesUnclosedBoundaryInReverseOrder()
+ {
+ UnfilteredPartitionIterator original = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a")
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+ , row(2, "a", "0", "")
+ );
+ UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+ UnfilteredPartitionIterator expected = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a")
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+ , row(2, "a", "0", "")
+ , bound(Kind.INCL_START_BOUND, 1, "a", "0", "")
+ );
+
+ assertIteratorsEqual(expected, extended);
+ }
+
+ @Test
+ public void testFailsWithoutSeeingRows()
+ {
+ UnfilteredPartitionIterator iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundCloser());
+ assertThrowsISEIterated(iterator);
+ }
+
+ @Test
+ public void testValidatesLegalBounds()
+ {
+ UnfilteredPartitionIterator iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+
+ , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+ , row(1, "a", "2", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ drain(iterator);
+ }
+
+ @Test
+ public void testValidatesLegalBoundsInReverseOrder()
+ {
+ UnfilteredPartitionIterator iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+ , row(1, "a", "2", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ drain(iterator);
+ }
+
+ @Test
+ public void testValidatesLegalBoundaries()
+ {
+ UnfilteredPartitionIterator iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a")
+
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+ , row(2, "a", "1", "")
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+ , row(3, "a", "2", "")
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+ , row(4, "a", "3", "")
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+
+ , bound(Kind.INCL_END_BOUND, 0, "a")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ drain(iterator);
+ }
+
+ @Test
+ public void testValidatesLegalBoundariesInReverseOrder()
+ {
+ UnfilteredPartitionIterator iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a")
+
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+ , row(4, "a", "3", "")
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+ , row(3, "a", "2", "")
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+
+ , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+ , row(2, "a", "1", "")
+ , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+
+ , bound(Kind.INCL_START_BOUND, 0, "a")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ drain(iterator);
+ }
+
+ @Test
+ public void testComplainsAboutMismatchedTimestamps()
+ {
+ UnfilteredPartitionIterator iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 1, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+ }
+
+ @Test
+ public void testComplainsAboutMismatchedTimestampsInReverseOrder()
+ {
+ UnfilteredPartitionIterator iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 1, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+ }
+
+ @Test
+ public void testComplainsAboutInvalidSequence()
+ {
+ // duplicated start bound
+ UnfilteredPartitionIterator iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // duplicated end bound
+ iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // absent open bound
+ iterator = iter(false
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // absent end bound
+ iterator = iter(false
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+ }
+
+ @Test
+ public void testComplainsAboutInvalidSequenceInReveseOrder()
+ {
+ // duplicated start bound
+ UnfilteredPartitionIterator iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // duplicated end bound
+ iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // absent open bound
+ iterator = iter(true
+ , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ , row(1, "a", "1", "")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+
+ // absent end bound
+ iterator = iter(true
+ , row(1, "a", "1", "")
+ , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+ );
+ iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+ assertThrowsISEIterated(iterator);
+ }
+
+ private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind, long timestamp, Object... clusteringValues)
+ {
+ ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+ for (int i = 0; i < clusteringValues.length; i++)
+ clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+ return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec));
+ }
+
+ private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind, long closeTimestamp, long openTimestamp, Object... clusteringValues)
+ {
+ ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+ for (int i = 0; i < clusteringValues.length; i++)
+ clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+ return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+ new DeletionTime(closeTimestamp, nowInSec),
+ new DeletionTime(openTimestamp, nowInSec));
+ }
+
+ private Row row(long timestamp, Object... clusteringValues)
+ {
+ ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+ for (int i = 0; i < clusteringValues.length; i++)
+ clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+ return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, timestamp, nowInSec));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+ {
+ return ((AbstractType<T>) type).decompose(value);
+ }
+
+ private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+ {
+ Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+ UnfilteredRowIterator rowIter =
+ new AbstractUnfilteredRowIterator(metadata,
+ key,
+ DeletionTime.LIVE,
+ metadata.partitionColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ isReversedOrder,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+ };
+
+ return new SingletonUnfilteredPartitionIterator(rowIter, false);
+ }
+
+ private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+ {
+ while (iter1.hasNext())
+ {
+ assertTrue(iter2.hasNext());
+
+ try (UnfilteredRowIterator partition1 = iter1.next())
+ {
+ try (UnfilteredRowIterator partition2 = iter2.next())
+ {
+ assertIteratorsEqual(partition1, partition2);
+ }
+ }
+ }
+ assertFalse(iter2.hasNext());
+ }
+
+ private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+ {
+ while (iter1.hasNext())
+ {
+ assertTrue(iter2.hasNext());
+
+ assertEquals(iter1.next(), iter2.next());
+ }
+ assertFalse(iter2.hasNext());
+ }
+
+ private void assertThrowsISEIterated(UnfilteredPartitionIterator iterator)
+ {
+ Throwable t = null;
+ try
+ {
+ drain(iterator);
+ }
+ catch (Throwable e)
+ {
+ t = e;
+ }
+ assertTrue(t instanceof IllegalStateException);
+ }
+
+ private void drain(UnfilteredPartitionIterator iter)
+ {
+ while (iter.hasNext())
+ {
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ while (partition.hasNext())
+ partition.next();
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org