You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/06/20 15:11:44 UTC

[1/3] cassandra git commit: Always close RT markers returned by ReadCommand#executeLocally()

Repository: cassandra
Updated Branches:
  refs/heads/trunk d0c2ab508 -> 97ada5bdf


Always close RT markers returned by ReadCommand#executeLocally()

patch by Aleksey Yeschenko; reviewed by Blake Eggleston and
Sam Tunnicliffe for CASSANDRA-14515


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e23c9e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e23c9e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e23c9e4

Branch: refs/heads/trunk
Commit: 4e23c9e4dba6ee772531d82980f73234bd41869a
Parents: eb91942
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Wed Jun 20 00:01:10 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Wed Jun 20 15:45:54 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  42 +-
 .../db/rows/UnfilteredRowIterators.java         |  26 +
 .../cassandra/db/transform/RTBoundCloser.java   | 110 +++++
 .../db/transform/RTBoundValidator.java          | 106 ++++
 .../apache/cassandra/service/DataResolver.java  |  21 +-
 .../db/transform/RTTransformationsTest.java     | 482 +++++++++++++++++++
 7 files changed, 758 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebf8764..aeeb0ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
  * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
  * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
  * Add Missing dependencies in pom-all (CASSANDRA-14422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c93692a..f8a0795 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.RTBoundCloser;
+import org.apache.cassandra.db.transform.RTBoundValidator;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
@@ -329,6 +331,10 @@ public abstract class ReadCommand implements ReadQuery
 
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
+        // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
+        // ends equal, and there are no dangling RT bound in any partition.
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+
         return isDigestQuery()
              ? ReadResponse.createDigestResponse(iterator, this)
              : ReadResponse.createDataResponse(iterator, this);
@@ -401,29 +407,37 @@ public abstract class ReadCommand implements ReadQuery
             Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
         }
 
-        UnfilteredPartitionIterator resultIterator = searcher == null
-                                         ? queryStorage(cfs, orderGroup)
-                                         : searcher.search(orderGroup);
+        UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup);
 
         try
         {
-            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
+            iterator = withoutPurgeableTombstones(iterator, cfs);
+            iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
 
             // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
             // no point in checking it again.
-            RowFilter updatedFilter = searcher == null
-                                    ? rowFilter()
-                                    : index.getPostIndexQueryFilter(rowFilter());
-
-            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
-            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
-            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
-            // processing we do on it).
-            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
+            RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
+
+            /*
+             * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+             * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+             * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+             * processing we do on it).
+             */
+            iterator = filter.filter(iterator, nowInSec());
+
+            // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
+            // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
+            iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+
+            // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
+            iterator = Transformation.apply(iterator, new RTBoundCloser());
+
+            return iterator;
         }
         catch (RuntimeException | Error e)
         {
-            resultIterator.close();
+            iterator.close();
             throw e;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 5c27363..f42f675 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -140,6 +140,32 @@ public abstract class UnfilteredRowIterators
         return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion);
     }
 
+    public static UnfilteredRowIterator singleton(Unfiltered unfiltered,
+                                                  CFMetaData metadata,
+                                                  DecoratedKey partitionKey,
+                                                  DeletionTime partitionLevelDeletion,
+                                                  PartitionColumns columns,
+                                                  Row staticRow,
+                                                  boolean isReverseOrder,
+                                                  EncodingStats encodingStats)
+    {
+        return new AbstractUnfilteredRowIterator(metadata, partitionKey, partitionLevelDeletion, columns, staticRow, isReverseOrder, encodingStats)
+        {
+            boolean isDone = false;
+
+            protected Unfiltered computeNext()
+            {
+                if (!isDone)
+                {
+                    isDone = true;
+                    return unfiltered;
+                }
+
+                return endOfData();
+            }
+        };
+    }
+
     /**
      * Digests the partition represented by the provided iterator.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
new file mode 100644
index 0000000..11f0344
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * A transformation that appends an RT bound marker to row iterators in case they don't have one.
+ *
+ * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadOrderGroup)},
+ * if {@link org.apache.cassandra.db.filter.DataLimits} stopped the iterator on a live row that was enclosed in an
+ * older RT.
+ *
+ * If we don't do this, and send a response without the closing bound, we can break read/short read protection read
+ * isolation, and potentially cause data loss.
+ *
+ * See CASSANDRA-14515 for context.
+ */
+public final class RTBoundCloser extends Transformation<UnfilteredRowIterator>
+{
+    @Override
+    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+    {
+        RowsTransformation transformation = new RowsTransformation(partition);
+        return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation);
+    }
+
+    private final static class RowsTransformation extends Transformation implements MoreRows<UnfilteredRowIterator>
+    {
+        private final UnfilteredRowIterator partition;
+
+        private Clustering lastRowClustering;
+        private DeletionTime openMarkerDeletionTime;
+
+        private RowsTransformation(UnfilteredRowIterator partition)
+        {
+            this.partition = partition;
+        }
+
+        @Override
+        public Row applyToRow(Row row)
+        {
+            lastRowClustering = row.clustering();
+            return row;
+        }
+
+        @Override
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            openMarkerDeletionTime =
+                marker.isOpen(partition.isReverseOrder()) ? marker.openDeletionTime(partition.isReverseOrder()) : null;
+            lastRowClustering = null;
+            return marker;
+        }
+
+        @Override
+        public UnfilteredRowIterator moreContents()
+        {
+            // there is no open RT in the stream - nothing for us to do
+            if (null == openMarkerDeletionTime)
+                return null;
+
+            /*
+             * there *is* an open RT in the stream, but there have been no rows after the opening bound - this must
+             * never happen in scenarios where RTBoundCloser is meant to be used; the last encountered clustering
+             * should be either a closing bound marker - if the iterator was exhausted fully - or a live row - if
+             * DataLimits stopped it short in the middle of an RT.
+             */
+            if (null == lastRowClustering)
+            {
+                CFMetaData metadata = partition.metadata();
+                String message =
+                    String.format("UnfilteredRowIterator for %s.%s has an open RT bound as its last item", metadata.ksName, metadata.cfName);
+                throw new IllegalStateException(message);
+            }
+
+            // create an artificial inclusive closing RT bound with bound matching last seen row's clustering
+            RangeTombstoneBoundMarker closingBound =
+                RangeTombstoneBoundMarker.inclusiveClose(partition.isReverseOrder(), lastRowClustering.getRawValues(), openMarkerDeletionTime);
+
+            return UnfilteredRowIterators.singleton(closingBound,
+                                                    partition.metadata(),
+                                                    partition.partitionKey(),
+                                                    partition.partitionLevelDeletion(),
+                                                    partition.columns(),
+                                                    partition.staticRow(),
+                                                    partition.isReverseOrder(),
+                                                    partition.stats());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
new file mode 100644
index 0000000..7866b14
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+/**
+ * A validating transformation that sanity-checks the sequence of RT bounds and boundaries in every partition.
+ *
+ * What we validate, specifically:
+ * - that open markers are only followed by close markers
+ * - that open markers and close markers have equal deletion times
+ * - optionally, that the iterator closes its last RT marker
+ */
+public final class RTBoundValidator extends Transformation<UnfilteredRowIterator>
+{
+    private final boolean enforceIsClosed;
+
+    public RTBoundValidator(boolean enforceIsClosed)
+    {
+        this.enforceIsClosed = enforceIsClosed;
+    }
+
+    @Override
+    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+    {
+        return Transformation.apply(partition, new RowsTransformation(partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+    }
+
+    private final static class RowsTransformation extends Transformation
+    {
+        private final CFMetaData metadata;
+        private final boolean isReverseOrder;
+        private final boolean enforceIsClosed;
+
+        private DeletionTime openMarkerDeletionTime;
+
+        private RowsTransformation(CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed)
+        {
+            this.metadata = metadata;
+            this.isReverseOrder = isReverseOrder;
+            this.enforceIsClosed = enforceIsClosed;
+        }
+
+        @Override
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            if (null == openMarkerDeletionTime)
+            {
+                 // there is no open RT in the stream - we are expecting a *_START_BOUND
+                if (marker.isClose(isReverseOrder))
+                    throw ise("unexpected end bound or boundary " + marker.toString(metadata));
+            }
+            else
+            {
+                // there is an open RT in the stream - we are expecting a *_BOUNDARY or an *_END_BOUND
+                if (!marker.isClose(isReverseOrder))
+                    throw ise("start bound followed by another start bound " + marker.toString(metadata));
+
+                // deletion times of open/close markers must match
+                DeletionTime deletionTime = marker.closeDeletionTime(isReverseOrder);
+                if (!deletionTime.equals(openMarkerDeletionTime))
+                    throw ise("open marker and close marker have different deletion times");
+
+                openMarkerDeletionTime = null;
+            }
+
+            if (marker.isOpen(isReverseOrder))
+                openMarkerDeletionTime = marker.openDeletionTime(isReverseOrder);
+
+            return marker;
+        }
+
+        @Override
+        public void onPartitionClose()
+        {
+            if (enforceIsClosed && null != openMarkerDeletionTime)
+                throw ise("expected all RTs to be closed, but the last one is open");
+        }
+
+        private IllegalStateException ise(String why)
+        {
+            String message = String.format("UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s",
+                                           metadata.ksName, metadata.cfName, why);
+            throw new IllegalStateException(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 2252913..522c57b 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -398,23 +398,12 @@ public class DataResolver extends ResponseResolver
                         if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion))
                         {
                             /*
-                             * Since there is an ongoing merged deletion, the only two ways we don't have an open repair for
-                             * this source are that:
-                             *
-                             * 1) it had a range open with the same deletion as current marker, and the marker is coming from
-                             *    a short read protection response - repeating the open RT bound, or
-                             * 2) it had a range open with the same deletion as current marker, and the marker is closing it.
+                             * Since there is an ongoing merged deletion, the only way we don't have an open repair for
+                             * this source is that it had a range open with the same deletion as current marker,
+                             * and the marker is closing it.
                              */
-                            if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1)
-                            {
-                                assert currentDeletion.equals(marker.openDeletionTime(isReversed))
-                                    : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
-                            }
-                            else // (2)
-                            {
-                                assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
-                                    : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
-                            }
+                            assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+                                 : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
 
                             // and so unless it's a boundary whose opening deletion time is still equal to the current
                             // deletion (see comment above for why this can actually happen), we have to repair the source

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e23c9e4/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
new file mode 100644
index 0000000..832c5a3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ClusteringPrefix.Kind;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class RTTransformationsTest
+{
+    private static final String KEYSPACE = "RTBoundCloserTest";
+    private static final String TABLE = "table";
+
+    private final int nowInSec = FBUtilities.nowInSeconds();
+
+    private CFMetaData metadata;
+    private DecoratedKey key;
+
+    @Before
+    public void setUp()
+    {
+        metadata =
+            CFMetaData.Builder
+                      .create(KEYSPACE, TABLE)
+                      .addPartitionKey("pk", UTF8Type.instance)
+                      .addClusteringColumn("ck0", UTF8Type.instance)
+                      .addClusteringColumn("ck1", UTF8Type.instance)
+                      .addClusteringColumn("ck2", UTF8Type.instance)
+                      .build();
+        key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+    }
+
+    @Test
+    public void testAddsNothingWhenAlreadyClosed()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        );
+
+        UnfilteredPartitionIterator extended = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        );
+        extended = Transformation.apply(extended, new RTBoundCloser());
+        assertIteratorsEqual(original, extended);
+    }
+
+    @Test
+    public void testAddsNothingWhenAlreadyClosedInReverseOrder()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+
+        UnfilteredPartitionIterator extended = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+        extended = Transformation.apply(extended, new RTBoundCloser());
+        assertIteratorsEqual(original, extended);
+    }
+
+    @Test
+    public void testClosesUnclosedBound()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        );
+        UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1", "")
+        );
+        assertIteratorsEqual(expected, extended);
+    }
+
+    @Test
+    public void testClosesUnclosedBoundary()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+        , row(2, "a", "1", "")
+        );
+        UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+        , row(2, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 1, "a", "1", "")
+        );
+        assertIteratorsEqual(expected, extended);
+    }
+
+    @Test
+    public void testClosesUnclosedBoundInReverseOrder()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        );
+        UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1", "")
+        );
+        assertIteratorsEqual(expected, extended);
+    }
+
+    @Test
+    public void testClosesUnclosedBoundaryInReverseOrder()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a")
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+        , row(2, "a", "0", "")
+        );
+        UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a")
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+        , row(2, "a", "0", "")
+        , bound(Kind.INCL_START_BOUND, 1, "a", "0", "")
+        );
+
+        assertIteratorsEqual(expected, extended);
+    }
+
+    @Test
+    public void testFailsWithoutSeeingRows()
+    {
+        UnfilteredPartitionIterator iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundCloser());
+        assertThrowsISEIterated(iterator);
+    }
+
+    @Test
+    public void testValidatesLegalBounds()
+    {
+        UnfilteredPartitionIterator iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+
+        , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+        , row(1, "a", "2", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        drain(iterator);
+    }
+
+    @Test
+    public void testValidatesLegalBoundsInReverseOrder()
+    {
+        UnfilteredPartitionIterator iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+        , row(1, "a", "2", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        drain(iterator);
+    }
+
+    @Test
+    public void testValidatesLegalBoundaries()
+    {
+        UnfilteredPartitionIterator iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a")
+
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+        , row(2, "a", "1", "")
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+        , row(3, "a", "2", "")
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+        , row(4, "a", "3", "")
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+
+        , bound(Kind.INCL_END_BOUND, 0, "a")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        drain(iterator);
+    }
+
+    @Test
+    public void testValidatesLegalBoundariesInReverseOrder()
+    {
+        UnfilteredPartitionIterator iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a")
+
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+        , row(4, "a", "3", "")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+        , row(3, "a", "2", "")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+
+        , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+        , row(2, "a", "1", "")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+
+        , bound(Kind.INCL_START_BOUND, 0, "a")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        drain(iterator);
+    }
+
+    @Test
+    public void testComplainsAboutMismatchedTimestamps()
+    {
+        UnfilteredPartitionIterator iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 1, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+    }
+
+    @Test
+    public void testComplainsAboutMismatchedTimestampsInReverseOrder()
+    {
+        UnfilteredPartitionIterator iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 1, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+    }
+
+    @Test
+    public void testComplainsAboutInvalidSequence()
+    {
+        // duplicated start bound
+        UnfilteredPartitionIterator iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // duplicated end bound
+        iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // absent open bound
+        iterator = iter(false
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // absent end bound
+        iterator = iter(false
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+    }
+
+    @Test
+    public void testComplainsAboutInvalidSequenceInReveseOrder()
+    {
+        // duplicated start bound
+        UnfilteredPartitionIterator iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // duplicated end bound
+        iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // absent open bound
+        iterator = iter(true
+        , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+        , row(1, "a", "1", "")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+
+        // absent end bound
+        iterator = iter(true
+        , row(1, "a", "1", "")
+        , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+        );
+        iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+        assertThrowsISEIterated(iterator);
+    }
+
+    private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind, long timestamp, Object... clusteringValues)
+    {
+        ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec));
+    }
+
+    private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind, long closeTimestamp, long openTimestamp, Object... clusteringValues)
+    {
+        ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                                new DeletionTime(closeTimestamp, nowInSec),
+                                                new DeletionTime(openTimestamp, nowInSec));
+    }
+
+    private Row row(long timestamp, Object... clusteringValues)
+    {
+        ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+        for (int i = 0; i < clusteringValues.length; i++)
+            clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+
+        return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, timestamp, nowInSec));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+
+    private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+        UnfilteredRowIterator rowIter =
+            new AbstractUnfilteredRowIterator(metadata,
+                                              key,
+                                              DeletionTime.LIVE,
+                                              metadata.partitionColumns(),
+                                              Rows.EMPTY_STATIC_ROW,
+                                              isReversedOrder,
+                                              EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
+
+    private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            try (UnfilteredRowIterator partition1 = iter1.next())
+            {
+                try (UnfilteredRowIterator partition2 = iter2.next())
+                {
+                    assertIteratorsEqual(partition1, partition2);
+                }
+            }
+        }
+        assertFalse(iter2.hasNext());
+    }
+
+    private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            assertEquals(iter1.next(), iter2.next());
+        }
+        assertFalse(iter2.hasNext());
+    }
+
+    private void assertThrowsISEIterated(UnfilteredPartitionIterator iterator)
+    {
+        Throwable t = null;
+        try
+        {
+            drain(iterator);
+        }
+        catch (Throwable e)
+        {
+            t = e;
+        }
+        assertTrue(t instanceof IllegalStateException);
+    }
+
+    private void drain(UnfilteredPartitionIterator iter)
+    {
+        while (iter.hasNext())
+        {
+            try (UnfilteredRowIterator partition = iter.next())
+            {
+                while (partition.hasNext())
+                    partition.next();
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/97ada5bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/97ada5bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/97ada5bd

Branch: refs/heads/trunk
Commit: 97ada5bdfd1c82978ac9306dee8c279ee704fb91
Parents: d0c2ab5 08a515d
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Wed Jun 20 16:11:33 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Wed Jun 20 16:11:33 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  44 +-
 .../db/rows/UnfilteredRowIterators.java         |  26 +
 .../cassandra/db/transform/RTBoundCloser.java   | 107 ++++
 .../db/transform/RTBoundValidator.java          | 106 ++++
 .../db/transform/RTTransformationsTest.java     | 482 +++++++++++++++++++
 6 files changed, 751 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 064dd77,8bcfcbc..7b554a7
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -26,16 -29,20 +26,18 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.*;
 -import org.apache.cassandra.cql3.Operator;
  import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.monitoring.ApproximateTime;
 -import org.apache.cassandra.db.monitoring.MonitorableImpl;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.transform.RTBoundCloser;
+ import org.apache.cassandra.db.transform.RTBoundValidator;
  import org.apache.cassandra.db.transform.StoppingTransformation;
  import org.apache.cassandra.db.transform.Transformation;
 -import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.exceptions.UnknownIndexException;
  import org.apache.cassandra.index.Index;
  import org.apache.cassandra.index.IndexNotAvailableException;
 -import org.apache.cassandra.io.ForwardingVersionedSerializer;
 +import org.apache.cassandra.index.IndexRegistry;
  import org.apache.cassandra.io.IVersionedSerializer;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
@@@ -282,12 -416,10 +288,10 @@@ public abstract class ReadCommand exten
                  throw new IndexNotAvailableException(index);
  
              searcher = index.searcherFor(this);
 -            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
          }
  
-         UnfilteredPartitionIterator resultIterator = searcher == null
-                                          ? queryStorage(cfs, executionController)
-                                          : searcher.search(executionController);
+         UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
  
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 0244531,d6502ec..b125c01
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@@ -146,11 -135,37 +146,37 @@@ public abstract class UnfilteredRowIter
      /**
       * Returns an empty unfiltered iterator for a given partition.
       */
 -    public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
 +    public static UnfilteredRowIterator noRowsIterator(final TableMetadata metadata, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
      {
 -        return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion);
 +        return EmptyIterators.unfilteredRow(metadata, partitionKey, isReverseOrder, staticRow, partitionDeletion);
      }
  
+     public static UnfilteredRowIterator singleton(Unfiltered unfiltered,
 -                                                  CFMetaData metadata,
++                                                  TableMetadata metadata,
+                                                   DecoratedKey partitionKey,
+                                                   DeletionTime partitionLevelDeletion,
 -                                                  PartitionColumns columns,
++                                                  RegularAndStaticColumns columns,
+                                                   Row staticRow,
+                                                   boolean isReverseOrder,
+                                                   EncodingStats encodingStats)
+     {
+         return new AbstractUnfilteredRowIterator(metadata, partitionKey, partitionLevelDeletion, columns, staticRow, isReverseOrder, encodingStats)
+         {
+             boolean isDone = false;
+ 
+             protected Unfiltered computeNext()
+             {
+                 if (!isDone)
+                 {
+                     isDone = true;
+                     return unfiltered;
+                 }
+ 
+                 return endOfData();
+             }
+         };
+     }
+ 
      /**
       * Digests the partition represented by the provided iterator.
       *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
index 0000000,413ea71..7a09dab
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
@@@ -1,0 -1,110 +1,107 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.transform;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.DeletionTime;
+ import org.apache.cassandra.db.ReadExecutionController;
+ import org.apache.cassandra.db.rows.*;
+ 
+ /**
+  * A transformation that appends an RT bound marker to row iterators in case they don't have one.
+  *
+  * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadExecutionController)}
+  * if {@link org.apache.cassandra.db.filter.DataLimits} stopped the iterator on a live row that was enclosed in an
+  * older RT.
+  *
+  * If we don't do this, and send a response without the closing bound, we can break read/short read protection read
+  * isolation, and potentially cause data loss.
+  *
+  * See CASSANDRA-14515 for context.
+  */
+ public final class RTBoundCloser extends Transformation<UnfilteredRowIterator>
+ {
+     @Override
+     public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+     {
+         RowsTransformation transformation = new RowsTransformation(partition);
+         return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation);
+     }
+ 
+     private final static class RowsTransformation extends Transformation implements MoreRows<UnfilteredRowIterator>
+     {
+         private final UnfilteredRowIterator partition;
+ 
+         private Clustering lastRowClustering;
+         private DeletionTime openMarkerDeletionTime;
+ 
+         private RowsTransformation(UnfilteredRowIterator partition)
+         {
+             this.partition = partition;
+         }
+ 
+         @Override
+         public Row applyToRow(Row row)
+         {
+             lastRowClustering = row.clustering();
+             return row;
+         }
+ 
+         @Override
+         public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+         {
+             openMarkerDeletionTime =
+                 marker.isOpen(partition.isReverseOrder()) ? marker.openDeletionTime(partition.isReverseOrder()) : null;
+             lastRowClustering = null;
+             return marker;
+         }
+ 
+         @Override
+         public UnfilteredRowIterator moreContents()
+         {
+             // there is no open RT in the stream - nothing for us to do
+             if (null == openMarkerDeletionTime)
+                 return null;
+ 
+             /*
+              * there *is* an open RT in the stream, but there have been no rows after the opening bound - this must
+              * never happen in scenarios where RTBoundCloser is meant to be used; the last encountered clustering
+              * should be either a closing bound marker - if the iterator was exhausted fully - or a live row - if
+              * DataLimits stopped it short in the middle of an RT.
+              */
+             if (null == lastRowClustering)
+             {
 -                CFMetaData metadata = partition.metadata();
 -                String message =
 -                    String.format("UnfilteredRowIterator for %s.%s has an open RT bound as its last item", metadata.ksName, metadata.cfName);
++                String message = String.format("UnfilteredRowIterator for %s has an open RT bound as its last item", partition.metadata());
+                 throw new IllegalStateException(message);
+             }
+ 
+             // create an artificial inclusive closing RT bound with bound matching last seen row's clustering
+             RangeTombstoneBoundMarker closingBound =
+                 RangeTombstoneBoundMarker.inclusiveClose(partition.isReverseOrder(), lastRowClustering.getRawValues(), openMarkerDeletionTime);
+ 
+             return UnfilteredRowIterators.singleton(closingBound,
+                                                     partition.metadata(),
+                                                     partition.partitionKey(),
+                                                     partition.partitionLevelDeletion(),
+                                                     partition.columns(),
+                                                     partition.staticRow(),
+                                                     partition.isReverseOrder(),
+                                                     partition.stats());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
index 0000000,7866b14..644a544
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
@@@ -1,0 -1,106 +1,106 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.transform;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.DeletionTime;
+ import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
++import org.apache.cassandra.schema.TableMetadata;
+ 
+ /**
+  * A validating transformation that sanity-checks the sequence of RT bounds and boundaries in every partition.
+  *
+  * What we validate, specifically:
+  * - that open markers are only followed by close markers
+  * - that open markers and close markers have equal deletion times
+  * - optionally, that the iterator closes its last RT marker
+  */
+ public final class RTBoundValidator extends Transformation<UnfilteredRowIterator>
+ {
+     private final boolean enforceIsClosed;
+ 
+     public RTBoundValidator(boolean enforceIsClosed)
+     {
+         this.enforceIsClosed = enforceIsClosed;
+     }
+ 
+     @Override
+     public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+     {
+         return Transformation.apply(partition, new RowsTransformation(partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+     }
+ 
+     private final static class RowsTransformation extends Transformation
+     {
 -        private final CFMetaData metadata;
++        private final TableMetadata metadata;
+         private final boolean isReverseOrder;
+         private final boolean enforceIsClosed;
+ 
+         private DeletionTime openMarkerDeletionTime;
+ 
 -        private RowsTransformation(CFMetaData metadata, boolean isReverseOrder, boolean enforceIsClosed)
++        private RowsTransformation(TableMetadata metadata, boolean isReverseOrder, boolean enforceIsClosed)
+         {
+             this.metadata = metadata;
+             this.isReverseOrder = isReverseOrder;
+             this.enforceIsClosed = enforceIsClosed;
+         }
+ 
+         @Override
+         public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+         {
+             if (null == openMarkerDeletionTime)
+             {
+                  // there is no open RT in the stream - we are expecting a *_START_BOUND
+                 if (marker.isClose(isReverseOrder))
+                     throw ise("unexpected end bound or boundary " + marker.toString(metadata));
+             }
+             else
+             {
+                 // there is an open RT in the stream - we are expecting a *_BOUNDARY or an *_END_BOUND
+                 if (!marker.isClose(isReverseOrder))
+                     throw ise("start bound followed by another start bound " + marker.toString(metadata));
+ 
+                 // deletion times of open/close markers must match
+                 DeletionTime deletionTime = marker.closeDeletionTime(isReverseOrder);
+                 if (!deletionTime.equals(openMarkerDeletionTime))
+                     throw ise("open marker and close marker have different deletion times");
+ 
+                 openMarkerDeletionTime = null;
+             }
+ 
+             if (marker.isOpen(isReverseOrder))
+                 openMarkerDeletionTime = marker.openDeletionTime(isReverseOrder);
+ 
+             return marker;
+         }
+ 
+         @Override
+         public void onPartitionClose()
+         {
+             if (enforceIsClosed && null != openMarkerDeletionTime)
+                 throw ise("expected all RTs to be closed, but the last one is open");
+         }
+ 
+         private IllegalStateException ise(String why)
+         {
 -            String message = String.format("UnfilteredRowIterator for %s.%s has an illegal RT bounds sequence: %s",
 -                                           metadata.ksName, metadata.cfName, why);
++            String message =
++                String.format("UnfilteredRowIterator for %s has an illegal RT bounds sequence: %s", metadata, why);
+             throw new IllegalStateException(message);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/97ada5bd/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
index 0000000,ee91135..7b7eaa4
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
@@@ -1,0 -1,483 +1,482 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.transform;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.Iterators;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ClusteringPrefix.Kind;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public final class RTTransformationsTest
+ {
+     private static final String KEYSPACE = "RTBoundCloserTest";
+     private static final String TABLE = "table";
+ 
+     private final int nowInSec = FBUtilities.nowInSeconds();
+ 
 -    private CFMetaData metadata;
++    private TableMetadata metadata;
+     private DecoratedKey key;
+ 
+     @Before
+     public void setUp()
+     {
+         metadata =
 -            CFMetaData.Builder
 -                      .create(KEYSPACE, TABLE)
 -                      .addPartitionKey("pk", UTF8Type.instance)
 -                      .addClusteringColumn("ck0", UTF8Type.instance)
 -                      .addClusteringColumn("ck1", UTF8Type.instance)
 -                      .addClusteringColumn("ck2", UTF8Type.instance)
 -                      .withPartitioner(Murmur3Partitioner.instance)
 -                      .build();
++            TableMetadata.builder(KEYSPACE, TABLE)
++                         .addPartitionKeyColumn("pk", UTF8Type.instance)
++                         .addClusteringColumn("ck0", UTF8Type.instance)
++                         .addClusteringColumn("ck1", UTF8Type.instance)
++                         .addClusteringColumn("ck2", UTF8Type.instance)
++                         .partitioner(Murmur3Partitioner.instance)
++                         .build();
+         key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+     }
+ 
+     @Test
+     public void testAddsNothingWhenAlreadyClosed()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+ 
+         UnfilteredPartitionIterator extended = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         extended = Transformation.apply(extended, new RTBoundCloser());
+         assertIteratorsEqual(original, extended);
+     }
+ 
+     @Test
+     public void testAddsNothingWhenAlreadyClosedInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+ 
+         UnfilteredPartitionIterator extended = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         extended = Transformation.apply(extended, new RTBoundCloser());
+         assertIteratorsEqual(original, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBound()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundary()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+         , row(2, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+         , row(2, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 1, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundaryInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "0", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "0", "")
+         , bound(Kind.INCL_START_BOUND, 1, "a", "0", "")
+         );
+ 
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testFailsWithoutSeeingRows()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundCloser());
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBounds()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ 
+         , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+         , row(1, "a", "2", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundsInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+         , row(1, "a", "2", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+ 
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundaries()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+         , row(2, "a", "1", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+         , row(3, "a", "2", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+         , row(4, "a", "3", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+ 
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundariesInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+         , row(4, "a", "3", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+         , row(3, "a", "2", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "1", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+ 
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutMismatchedTimestamps()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 1, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutMismatchedTimestampsInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 1, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutInvalidSequence()
+     {
+         // duplicated start bound
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // duplicated end bound
+         iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent open bound
+         iterator = iter(false
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent end bound
+         iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutInvalidSequenceInReveseOrder()
+     {
+         // duplicated start bound
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // duplicated end bound
+         iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent open bound
+         iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent end bound
+         iterator = iter(true
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind, long timestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
+         return new RangeTombstoneBoundMarker(ClusteringBound.create(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec));
+     }
+ 
+     private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind, long closeTimestamp, long openTimestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
+         return new RangeTombstoneBoundaryMarker(ClusteringBoundary.create(kind, clusteringByteBuffers),
+                                                 new DeletionTime(closeTimestamp, nowInSec),
+                                                 new DeletionTime(openTimestamp, nowInSec));
+     }
+ 
+     private Row row(long timestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
+         return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp, nowInSec));
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+ 
+         UnfilteredRowIterator rowIter =
+             new AbstractUnfilteredRowIterator(metadata,
+                                               key,
+                                               DeletionTime.LIVE,
 -                                              metadata.partitionColumns(),
++                                              metadata.regularAndStaticColumns(),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               isReversedOrder,
+                                               EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+ 
 -        return new SingletonUnfilteredPartitionIterator(rowIter, false);
++        return new SingletonUnfilteredPartitionIterator(rowIter);
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             try (UnfilteredRowIterator partition1 = iter1.next())
+             {
+                 try (UnfilteredRowIterator partition2 = iter2.next())
+                 {
+                     assertIteratorsEqual(partition1, partition2);
+                 }
+             }
+         }
+         assertFalse(iter2.hasNext());
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             assertEquals(iter1.next(), iter2.next());
+         }
+         assertFalse(iter2.hasNext());
+     }
+ 
+     private void assertThrowsISEIterated(UnfilteredPartitionIterator iterator)
+     {
+         Throwable t = null;
+         try
+         {
+             drain(iterator);
+         }
+         catch (Throwable e)
+         {
+             t = e;
+         }
+         assertTrue(t instanceof IllegalStateException);
+     }
+ 
+     private void drain(UnfilteredPartitionIterator iter)
+     {
+         while (iter.hasNext())
+         {
+             try (UnfilteredRowIterator partition = iter.next())
+             {
+                 while (partition.hasNext())
+                     partition.next();
+             }
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08a515dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08a515dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08a515dc

Branch: refs/heads/trunk
Commit: 08a515dc7440db1530987a480afb27f5faaf8e0f
Parents: 191ad7b 4e23c9e
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Wed Jun 20 16:03:25 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Wed Jun 20 16:03:25 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  44 +-
 .../db/rows/UnfilteredRowIterators.java         |  26 +
 .../cassandra/db/transform/RTBoundCloser.java   | 110 +++++
 .../db/transform/RTBoundValidator.java          | 106 ++++
 .../apache/cassandra/service/DataResolver.java  |  21 +-
 .../db/transform/RTTransformationsTest.java     | 483 +++++++++++++++++++
 7 files changed, 760 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e807340,aeeb0ae..d9c62ea
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -3.0.17
 +3.11.3
 + * Remove BTree.Builder Recycler to reduce memory usage (CASSANDRA-13929)
 + * Reduce nodetool GC thread count (CASSANDRA-14475)
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
   * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
   * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
   * Add Missing dependencies in pom-all (CASSANDRA-14422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index c8b256a,f8a0795..8bcfcbc
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -31,11 -31,10 +31,13 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.config.*;
  import org.apache.cassandra.cql3.Operator;
  import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.monitoring.ApproximateTime;
 +import org.apache.cassandra.db.monitoring.MonitorableImpl;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.transform.RTBoundCloser;
+ import org.apache.cassandra.db.transform.RTBoundValidator;
 +import org.apache.cassandra.db.transform.StoppingTransformation;
  import org.apache.cassandra.db.transform.Transformation;
  import org.apache.cassandra.dht.AbstractBounds;
  import org.apache.cassandra.index.Index;
@@@ -413,14 -407,12 +419,13 @@@ public abstract class ReadCommand exten
              Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
          }
  
-         UnfilteredPartitionIterator resultIterator = searcher == null
-                                          ? queryStorage(cfs, executionController)
-                                          : searcher.search(executionController);
 -        UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup);
++        UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
  
          try
          {
-             resultIterator = withStateTracking(resultIterator);
-             resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
++            iterator = withStateTracking(iterator);
+             iterator = withoutPurgeableTombstones(iterator, cfs);
+             iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
  
              // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
              // no point in checking it again.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
index 0000000,11f0344..413ea71
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundCloser.java
@@@ -1,0 -1,110 +1,110 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.transform;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.DeletionTime;
 -import org.apache.cassandra.db.ReadOrderGroup;
++import org.apache.cassandra.db.ReadExecutionController;
+ import org.apache.cassandra.db.rows.*;
+ 
+ /**
+  * A transformation that appends an RT bound marker to row iterators in case they don't have one.
+  *
 - * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadOrderGroup)},
++ * This used to happen, for example, in {@link org.apache.cassandra.db.ReadCommand#executeLocally(ReadExecutionController)}
+  * if {@link org.apache.cassandra.db.filter.DataLimits} stopped the iterator on a live row that was enclosed in an
+  * older RT.
+  *
+  * If we don't do this, and send a response without the closing bound, we can break read/short read protection read
+  * isolation, and potentially cause data loss.
+  *
+  * See CASSANDRA-14515 for context.
+  */
+ public final class RTBoundCloser extends Transformation<UnfilteredRowIterator>
+ {
+     @Override
+     public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+     {
+         RowsTransformation transformation = new RowsTransformation(partition);
+         return Transformation.apply(MoreRows.extend(partition, transformation, partition.columns()), transformation);
+     }
+ 
+     private final static class RowsTransformation extends Transformation implements MoreRows<UnfilteredRowIterator>
+     {
+         private final UnfilteredRowIterator partition;
+ 
+         private Clustering lastRowClustering;
+         private DeletionTime openMarkerDeletionTime;
+ 
+         private RowsTransformation(UnfilteredRowIterator partition)
+         {
+             this.partition = partition;
+         }
+ 
+         @Override
+         public Row applyToRow(Row row)
+         {
+             lastRowClustering = row.clustering();
+             return row;
+         }
+ 
+         @Override
+         public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+         {
+             openMarkerDeletionTime =
+                 marker.isOpen(partition.isReverseOrder()) ? marker.openDeletionTime(partition.isReverseOrder()) : null;
+             lastRowClustering = null;
+             return marker;
+         }
+ 
+         @Override
+         public UnfilteredRowIterator moreContents()
+         {
+             // there is no open RT in the stream - nothing for us to do
+             if (null == openMarkerDeletionTime)
+                 return null;
+ 
+             /*
+              * there *is* an open RT in the stream, but there have been no rows after the opening bound - this must
+              * never happen in scenarios where RTBoundCloser is meant to be used; the last encountered clustering
+              * should be either a closing bound marker - if the iterator was exhausted fully - or a live row - if
+              * DataLimits stopped it short in the middle of an RT.
+              */
+             if (null == lastRowClustering)
+             {
+                 CFMetaData metadata = partition.metadata();
+                 String message =
+                     String.format("UnfilteredRowIterator for %s.%s has an open RT bound as its last item", metadata.ksName, metadata.cfName);
+                 throw new IllegalStateException(message);
+             }
+ 
+             // create an artificial inclusive closing RT bound with bound matching last seen row's clustering
+             RangeTombstoneBoundMarker closingBound =
+                 RangeTombstoneBoundMarker.inclusiveClose(partition.isReverseOrder(), lastRowClustering.getRawValues(), openMarkerDeletionTime);
+ 
+             return UnfilteredRowIterators.singleton(closingBound,
+                                                     partition.metadata(),
+                                                     partition.partitionKey(),
+                                                     partition.partitionLevelDeletion(),
+                                                     partition.columns(),
+                                                     partition.staticRow(),
+                                                     partition.isReverseOrder(),
+                                                     partition.stats());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08a515dc/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
index 0000000,832c5a3..ee91135
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/RTTransformationsTest.java
@@@ -1,0 -1,482 +1,483 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.transform;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.Iterators;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ClusteringPrefix.Kind;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public final class RTTransformationsTest
+ {
+     private static final String KEYSPACE = "RTBoundCloserTest";
+     private static final String TABLE = "table";
+ 
+     private final int nowInSec = FBUtilities.nowInSeconds();
+ 
+     private CFMetaData metadata;
+     private DecoratedKey key;
+ 
+     @Before
+     public void setUp()
+     {
+         metadata =
+             CFMetaData.Builder
+                       .create(KEYSPACE, TABLE)
+                       .addPartitionKey("pk", UTF8Type.instance)
+                       .addClusteringColumn("ck0", UTF8Type.instance)
+                       .addClusteringColumn("ck1", UTF8Type.instance)
+                       .addClusteringColumn("ck2", UTF8Type.instance)
++                      .withPartitioner(Murmur3Partitioner.instance)
+                       .build();
+         key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+     }
+ 
+     @Test
+     public void testAddsNothingWhenAlreadyClosed()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+ 
+         UnfilteredPartitionIterator extended = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         extended = Transformation.apply(extended, new RTBoundCloser());
+         assertIteratorsEqual(original, extended);
+     }
+ 
+     @Test
+     public void testAddsNothingWhenAlreadyClosedInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+ 
+         UnfilteredPartitionIterator extended = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         extended = Transformation.apply(extended, new RTBoundCloser());
+         assertIteratorsEqual(original, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBound()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundary()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+         , row(2, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "0")
+         , row(2, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 1, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1", "")
+         );
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testClosesUnclosedBoundaryInReverseOrder()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "0", "")
+         );
+         UnfilteredPartitionIterator extended = Transformation.apply(original, new RTBoundCloser());
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "0", "")
+         , bound(Kind.INCL_START_BOUND, 1, "a", "0", "")
+         );
+ 
+         assertIteratorsEqual(expected, extended);
+     }
+ 
+     @Test
+     public void testFailsWithoutSeeingRows()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundCloser());
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBounds()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+ 
+         , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+         , row(1, "a", "2", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundsInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "2")
+         , row(1, "a", "2", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "2")
+ 
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundaries()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+         , row(2, "a", "1", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+         , row(3, "a", "2", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+ 
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+         , row(4, "a", "3", "")
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+ 
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testValidatesLegalBoundariesInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 3, 0, "a", "3")
+         , row(4, "a", "3", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 3, "a", "3")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 2, 0, "a", "2")
+         , row(3, "a", "2", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 2, "a", "2")
+ 
+         , boundary(Kind.INCL_END_EXCL_START_BOUNDARY, 1, 0, "a", "1")
+         , row(2, "a", "1", "")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0, 1, "a", "1")
+ 
+         , bound(Kind.INCL_START_BOUND, 0, "a")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         drain(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutMismatchedTimestamps()
+     {
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 1, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutMismatchedTimestampsInReverseOrder()
+     {
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 1, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutInvalidSequence()
+     {
+         // duplicated start bound
+         UnfilteredPartitionIterator iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // duplicated end bound
+         iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent open bound
+         iterator = iter(false
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent end bound
+         iterator = iter(false
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     @Test
+     public void testComplainsAboutInvalidSequenceInReveseOrder()
+     {
+         // duplicated start bound
+         UnfilteredPartitionIterator iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // duplicated end bound
+         iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent open bound
+         iterator = iter(true
+         , bound(Kind.INCL_END_BOUND, 0, "a", "1")
+         , row(1, "a", "1", "")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+ 
+         // absent end bound
+         iterator = iter(true
+         , row(1, "a", "1", "")
+         , bound(Kind.INCL_START_BOUND, 0, "a", "1")
+         );
+         iterator = Transformation.apply(iterator, new RTBoundValidator(true));
+         assertThrowsISEIterated(iterator);
+     }
+ 
+     private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind, long timestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
 -        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec));
++        return new RangeTombstoneBoundMarker(ClusteringBound.create(kind, clusteringByteBuffers), new DeletionTime(timestamp, nowInSec));
+     }
+ 
+     private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind, long closeTimestamp, long openTimestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
 -        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
++        return new RangeTombstoneBoundaryMarker(ClusteringBoundary.create(kind, clusteringByteBuffers),
+                                                 new DeletionTime(closeTimestamp, nowInSec),
+                                                 new DeletionTime(openTimestamp, nowInSec));
+     }
+ 
+     private Row row(long timestamp, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
 -        return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, timestamp, nowInSec));
++        return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(timestamp, nowInSec));
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+ 
+         UnfilteredRowIterator rowIter =
+             new AbstractUnfilteredRowIterator(metadata,
+                                               key,
+                                               DeletionTime.LIVE,
+                                               metadata.partitionColumns(),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               isReversedOrder,
+                                               EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+ 
+         return new SingletonUnfilteredPartitionIterator(rowIter, false);
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             try (UnfilteredRowIterator partition1 = iter1.next())
+             {
+                 try (UnfilteredRowIterator partition2 = iter2.next())
+                 {
+                     assertIteratorsEqual(partition1, partition2);
+                 }
+             }
+         }
+         assertFalse(iter2.hasNext());
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             assertEquals(iter1.next(), iter2.next());
+         }
+         assertFalse(iter2.hasNext());
+     }
+ 
+     private void assertThrowsISEIterated(UnfilteredPartitionIterator iterator)
+     {
+         Throwable t = null;
+         try
+         {
+             drain(iterator);
+         }
+         catch (Throwable e)
+         {
+             t = e;
+         }
+         assertTrue(t instanceof IllegalStateException);
+     }
+ 
+     private void drain(UnfilteredPartitionIterator iter)
+     {
+         while (iter.hasNext())
+         {
+             try (UnfilteredRowIterator partition = iter.next())
+             {
+                 while (partition.hasNext())
+                     partition.next();
+             }
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org