You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/09/25 16:46:23 UTC

[1/6] cassandra git commit: Fix purging semi-expired RT boundaries in reversed iterators

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 45937def3 -> d496dca67
  refs/heads/cassandra-3.11 4d3f5a32b -> c34a0f520
  refs/heads/trunk 5069b2c0f -> 210da3dc0


Fix purging semi-expired RT boundaries in reversed iterators

patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-14672


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d496dca6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d496dca6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d496dca6

Branch: refs/heads/cassandra-3.0
Commit: d496dca6729853ece49d68c4837fed35149c95d0
Parents: 45937de
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Fri Sep 21 21:26:13 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:32:56 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 294 +++++++++++++++++++
 3 files changed, 297 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43628b2..2c2f4f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.18
+ * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
  * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
  * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index c4bc2f2..f0f5421 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -136,12 +136,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
 
     public RangeTombstoneBoundMarker createCorrespondingCloseMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(closeBound(reversed), endDeletion);
+        return new RangeTombstoneBoundMarker(closeBound(reversed), closeDeletionTime(reversed));
     }
 
     public RangeTombstoneBoundMarker createCorrespondingOpenMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion);
+        return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed));
     }
 
     public void digest(MessageDigest digest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
new file mode 100644
index 0000000..1dea7f3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterators;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ClusteringPrefix.Kind;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class PurgeFunctionTest
+{
+    private static final String KEYSPACE = "PurgeFunctionTest";
+    private static final String TABLE = "table";
+
+    private CFMetaData metadata;
+    private DecoratedKey key;
+
+    private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+    {
+        class WithoutPurgeableTombstones extends PurgeFunction
+        {
+            private WithoutPurgeableTombstones()
+            {
+                super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+            }
+
+            protected Predicate<Long> getPurgeEvaluator()
+            {
+                return time -> true;
+            }
+        }
+
+        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+    }
+
+    @Before
+    public void setUp()
+    {
+        metadata =
+            CFMetaData.Builder
+                      .create(KEYSPACE, TABLE)
+                      .addPartitionKey("pk", UTF8Type.instance)
+                      .addClusteringColumn("ck", UTF8Type.instance)
+                      .build();
+        key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+    }
+
+    @Test
+    public void testNothingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testNothingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testEverythingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testEverythingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+        UnfilteredRowIterator rowIter =
+            new AbstractUnfilteredRowIterator(metadata,
+                                              key,
+                                              DeletionTime.LIVE,
+                                              metadata.partitionColumns(),
+                                              Rows.EMPTY_STATIC_ROW,
+                                              isReversedOrder,
+                                              EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
+
+    private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                            long timestamp,
+                                            int localDeletionTime,
+                                            Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                             new DeletionTime(timestamp, localDeletionTime));
+    }
+
+    private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                  long closeTimestamp,
+                                                  int closeLocalDeletionTime,
+                                                  long openTimestamp,
+                                                  int openDeletionTime,
+                                                  Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                                new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                new DeletionTime(openTimestamp, openDeletionTime));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+
+    private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            try (UnfilteredRowIterator partition1 = iter1.next())
+            {
+                try (UnfilteredRowIterator partition2 = iter2.next())
+                {
+                    assertIteratorsEqual(partition1, partition2);
+                }
+            }
+        }
+
+        assertTrue(!iter2.hasNext());
+    }
+
+    private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            assertEquals(iter1.next(), iter2.next());
+        }
+        assertTrue(!iter2.hasNext());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c34a0f52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c34a0f52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c34a0f52

Branch: refs/heads/cassandra-3.11
Commit: c34a0f5207d228a2b78f6295cd4ab3e0755e56c2
Parents: 4d3f5a3 d496dca
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Tue Sep 25 17:41:10 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:41:10 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 297 +++++++++++++++++++
 3 files changed, 300 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a4fa705,2c2f4f5..20cec87
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,5 -1,5 +1,6 @@@
 -3.0.18
 +3.11.4
 +Merged from 3.0:
+  * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
   * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
   * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
   * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
index 0000000,1dea7f3..7f85aea
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@@ -1,0 -1,294 +1,297 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.partitions;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Iterator;
+ import java.util.function.Predicate;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ClusteringPrefix.Kind;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public final class PurgeFunctionTest
+ {
+     private static final String KEYSPACE = "PurgeFunctionTest";
+     private static final String TABLE = "table";
+ 
+     private CFMetaData metadata;
+     private DecoratedKey key;
+ 
+     private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+     {
+         class WithoutPurgeableTombstones extends PurgeFunction
+         {
+             private WithoutPurgeableTombstones()
+             {
+                 super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+             }
+ 
+             protected Predicate<Long> getPurgeEvaluator()
+             {
+                 return time -> true;
+             }
+         }
+ 
+         return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+     }
+ 
+     @Before
+     public void setUp()
+     {
++        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
++
+         metadata =
+             CFMetaData.Builder
+                       .create(KEYSPACE, TABLE)
+                       .addPartitionKey("pk", UTF8Type.instance)
+                       .addClusteringColumn("ck", UTF8Type.instance)
+                       .build();
+         key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+ 
+         UnfilteredRowIterator rowIter =
+             new AbstractUnfilteredRowIterator(metadata,
+                                               key,
+                                               DeletionTime.LIVE,
+                                               metadata.partitionColumns(),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               isReversedOrder,
+                                               EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+ 
+         return new SingletonUnfilteredPartitionIterator(rowIter, false);
+     }
+ 
+     private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                             long timestamp,
+                                             int localDeletionTime,
+                                             Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
 -        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
++        return new RangeTombstoneBoundMarker(ClusteringBound.create(kind, clusteringByteBuffers),
+                                              new DeletionTime(timestamp, localDeletionTime));
+     }
+ 
+     private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                   long closeTimestamp,
+                                                   int closeLocalDeletionTime,
+                                                   long openTimestamp,
+                                                   int openDeletionTime,
+                                                   Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
 -        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
++        return new RangeTombstoneBoundaryMarker(ClusteringBoundary.create(kind, clusteringByteBuffers),
+                                                 new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                 new DeletionTime(openTimestamp, openDeletionTime));
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             try (UnfilteredRowIterator partition1 = iter1.next())
+             {
+                 try (UnfilteredRowIterator partition2 = iter2.next())
+                 {
+                     assertIteratorsEqual(partition1, partition2);
+                 }
+             }
+         }
+ 
+         assertTrue(!iter2.hasNext());
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             assertEquals(iter1.next(), iter2.next());
+         }
+         assertTrue(!iter2.hasNext());
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/210da3dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/210da3dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/210da3dc

Branch: refs/heads/trunk
Commit: 210da3dc003a4107e125d0276476c289f2a2b97c
Parents: 5069b2c c34a0f5
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Tue Sep 25 17:46:05 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:46:05 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 296 +++++++++++++++++++
 3 files changed, 300 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/210da3dc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b39fe03,20cec87..9f7958c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,320 -1,7 +1,322 @@@
 +4.0
 + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
 + * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)
 + * Add a check for receiving digest response from transient node (CASSANDRA-14750)
 + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
 + * Remove mentions of transient replication from repair path (CASSANDRA-14698)
 + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
 + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
 + * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
 + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
 + * Add checksumming to the native protocol (CASSANDRA-13304)
 + * Make AuthCache more easily extendable (CASSANDRA-14662)
 + * Extend RolesCache to include detailed role info (CASSANDRA-14497)
 + * Add fqltool compare (CASSANDRA-14619)
 + * Add fqltool replay (CASSANDRA-14618)
 + * Log keyspace in full query log (CASSANDRA-14656)
 + * Transient Replication and Cheap Quorums (CASSANDRA-14404)
 + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
 + * Add diagnostic events for read repairs (CASSANDRA-14668)
 + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)
 + * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
 + * Clean up Message.Request implementations (CASSANDRA-14677)
 + * Disable old native protocol versions on demand (CASANDRA-14659)
 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
 + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
 + * Make monotonic read / read repair configurable (CASSANDRA-14635)
 + * Refactor CompactionStrategyManager (CASSANDRA-14621)
 + * Flush netty client messages immediately by default (CASSANDRA-13651)
 + * Improve read repair blocking behavior (CASSANDRA-10726)
 + * Add a virtual table to expose settings (CASSANDRA-14573)
 + * Fix up chunk cache handling of metrics (CASSANDRA-14628)
 + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
 + * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574)
 + * Add diagnostic events for user audit logging (CASSANDRA-13668)
 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
 + * Add base classes for diagnostic events (CASSANDRA-13457)
 + * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
 + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637)
 + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
 + * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
 + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
 + * Update netty to 4.1.128 (CASSANDRA-14633)
 + * Add a virtual table to expose thread pools (CASSANDRA-14523)
 + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
 + * Fix toDate function for timestamp arguments (CASSANDRA-14502)
 + * Revert running dtests by default in circleci (CASSANDRA-14614)
 + * Stream entire SSTables when possible (CASSANDRA-14556)
 + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
 + * Add experimental support for Java 11 (CASSANDRA-9608)
 + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
 + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
 + * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
 + * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426)
 + * Bump the hints messaging version to match the current one (CASSANDRA-14536)
 + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546)
 + * Report why native_transport_port fails to bind (CASSANDRA-14544)
 + * Optimize internode messaging protocol (CASSANDRA-14485)
 + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540)
 + * Add a virtual table to expose active client connections (CASSANDRA-14458)
 + * Clean up and refactor client metrics (CASSANDRA-14524)
 + * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529)
 + * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356)
 + * Abort compactions quicker (CASSANDRA-14397)
 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
 + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
 + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
 + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
 + * Let nodetool import take a list of directories (CASSANDRA-14442)
 + * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)
 + * Implement virtual keyspace interface (CASSANDRA-7622)
 + * nodetool import cleanup and improvements (CASSANDRA-14417)
 + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
 + * Audit logging for database activity (CASSANDRA-12151)
 + * Clean up build artifacts in docs container (CASSANDRA-14432)
 + * Minor network authz improvements (Cassandra-14413)
 + * Automatic sstable upgrades (CASSANDRA-14197)
 + * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
 + * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
 + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)
 + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281)
 + * Add network authz (CASSANDRA-13985)
 + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389)
 + * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381)
 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
 + * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392)
 + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
 + * Eliminate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
 + * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839)
 + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
 + * Abstract write path for pluggable storage (CASSANDRA-14118)
 + * nodetool describecluster should be more informative (CASSANDRA-13853)
 + * Compaction performance improvements (CASSANDRA-14261) 
 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
 + * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889)
 + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
 + * Add support for hybrid MIN(), MAX() speculative retry policies
 +   (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
 + * Fix some regressions caused by 14058 (CASSANDRA-14353)
 + * Abstract repair for pluggable storage (CASSANDRA-14116)
 + * Add meaningful toString() impls (CASSANDRA-13653)
 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
 + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
 + * Add coordinator write metric per CF (CASSANDRA-14232)
 + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314)
 + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
 +
 +
  3.11.4
  Merged from 3.0:
+  * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
+  * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
   * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
   * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
   * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/210da3dc/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 79d5b1a,ad71784..1cbf31c
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@@ -147,17 -142,23 +147,17 @@@ public class RangeTombstoneBoundaryMark
  
      public RangeTombstoneBoundMarker createCorrespondingOpenMarker(boolean reversed)
      {
-         return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion);
+         return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed));
      }
  
 -    public void digest(MessageDigest digest)
 -    {
 -        bound.digest(digest);
 -        endDeletion.digest(digest);
 -        startDeletion.digest(digest);
 -    }
 -
 -    @Override
 -    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
 +    public void digest(Hasher hasher)
      {
 -        digest(digest);
 +        bound.digest(hasher);
 +        endDeletion.digest(hasher);
 +        startDeletion.digest(hasher);
      }
  
 -    public String toString(CFMetaData metadata)
 +    public String toString(TableMetadata metadata)
      {
          return String.format("Marker %s@%d/%d-%d/%d",
                               bound.toString(metadata),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/210da3dc/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
index 0000000,7f85aea..ce569b5
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@@ -1,0 -1,297 +1,296 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.partitions;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Iterator;
 -import java.util.function.Predicate;
++import java.util.function.LongPredicate;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ClusteringPrefix.Kind;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public final class PurgeFunctionTest
+ {
+     private static final String KEYSPACE = "PurgeFunctionTest";
+     private static final String TABLE = "table";
+ 
 -    private CFMetaData metadata;
++    private TableMetadata metadata;
+     private DecoratedKey key;
+ 
+     private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+     {
+         class WithoutPurgeableTombstones extends PurgeFunction
+         {
+             private WithoutPurgeableTombstones()
+             {
 -                super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
++                super(FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+             }
+ 
 -            protected Predicate<Long> getPurgeEvaluator()
++            protected LongPredicate getPurgeEvaluator()
+             {
+                 return time -> true;
+             }
+         }
+ 
+         return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+     }
+ 
+     @Before
+     public void setUp()
+     {
+         DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ 
+         metadata =
 -            CFMetaData.Builder
 -                      .create(KEYSPACE, TABLE)
 -                      .addPartitionKey("pk", UTF8Type.instance)
 -                      .addClusteringColumn("ck", UTF8Type.instance)
 -                      .build();
++            TableMetadata.builder(KEYSPACE, TABLE)
++                         .addPartitionKeyColumn("pk", UTF8Type.instance)
++                         .addClusteringColumn("ck", UTF8Type.instance)
++                         .build();
+         key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+ 
+         UnfilteredRowIterator rowIter =
+             new AbstractUnfilteredRowIterator(metadata,
+                                               key,
+                                               DeletionTime.LIVE,
 -                                              metadata.partitionColumns(),
++                                              metadata.regularAndStaticColumns(),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               isReversedOrder,
+                                               EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+ 
 -        return new SingletonUnfilteredPartitionIterator(rowIter, false);
++        return new SingletonUnfilteredPartitionIterator(rowIter);
+     }
+ 
+     private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                             long timestamp,
+                                             int localDeletionTime,
+                                             Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
+         return new RangeTombstoneBoundMarker(ClusteringBound.create(kind, clusteringByteBuffers),
+                                              new DeletionTime(timestamp, localDeletionTime));
+     }
+ 
+     private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                   long closeTimestamp,
+                                                   int closeLocalDeletionTime,
+                                                   long openTimestamp,
+                                                   int openDeletionTime,
+                                                   Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
+         return new RangeTombstoneBoundaryMarker(ClusteringBoundary.create(kind, clusteringByteBuffers),
+                                                 new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                 new DeletionTime(openTimestamp, openDeletionTime));
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             try (UnfilteredRowIterator partition1 = iter1.next())
+             {
+                 try (UnfilteredRowIterator partition2 = iter2.next())
+                 {
+                     assertIteratorsEqual(partition1, partition2);
+                 }
+             }
+         }
+ 
+         assertTrue(!iter2.hasNext());
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             assertEquals(iter1.next(), iter2.next());
+         }
+         assertTrue(!iter2.hasNext());
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Fix purging semi-expired RT boundaries in reversed iterators

Posted by al...@apache.org.
Fix purging semi-expired RT boundaries in reversed iterators

patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-14672


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d496dca6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d496dca6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d496dca6

Branch: refs/heads/trunk
Commit: d496dca6729853ece49d68c4837fed35149c95d0
Parents: 45937de
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Fri Sep 21 21:26:13 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:32:56 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 294 +++++++++++++++++++
 3 files changed, 297 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43628b2..2c2f4f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.18
+ * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
  * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
  * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index c4bc2f2..f0f5421 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -136,12 +136,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
 
     public RangeTombstoneBoundMarker createCorrespondingCloseMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(closeBound(reversed), endDeletion);
+        return new RangeTombstoneBoundMarker(closeBound(reversed), closeDeletionTime(reversed));
     }
 
     public RangeTombstoneBoundMarker createCorrespondingOpenMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion);
+        return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed));
     }
 
     public void digest(MessageDigest digest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
new file mode 100644
index 0000000..1dea7f3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterators;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ClusteringPrefix.Kind;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class PurgeFunctionTest
+{
+    private static final String KEYSPACE = "PurgeFunctionTest";
+    private static final String TABLE = "table";
+
+    private CFMetaData metadata;
+    private DecoratedKey key;
+
+    private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+    {
+        class WithoutPurgeableTombstones extends PurgeFunction
+        {
+            private WithoutPurgeableTombstones()
+            {
+                super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+            }
+
+            protected Predicate<Long> getPurgeEvaluator()
+            {
+                return time -> true;
+            }
+        }
+
+        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+    }
+
+    @Before
+    public void setUp()
+    {
+        metadata =
+            CFMetaData.Builder
+                      .create(KEYSPACE, TABLE)
+                      .addPartitionKey("pk", UTF8Type.instance)
+                      .addClusteringColumn("ck", UTF8Type.instance)
+                      .build();
+        key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+    }
+
+    @Test
+    public void testNothingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testNothingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testEverythingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testEverythingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+        UnfilteredRowIterator rowIter =
+            new AbstractUnfilteredRowIterator(metadata,
+                                              key,
+                                              DeletionTime.LIVE,
+                                              metadata.partitionColumns(),
+                                              Rows.EMPTY_STATIC_ROW,
+                                              isReversedOrder,
+                                              EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
+
+    private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                            long timestamp,
+                                            int localDeletionTime,
+                                            Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                             new DeletionTime(timestamp, localDeletionTime));
+    }
+
+    private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                  long closeTimestamp,
+                                                  int closeLocalDeletionTime,
+                                                  long openTimestamp,
+                                                  int openDeletionTime,
+                                                  Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                                new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                new DeletionTime(openTimestamp, openDeletionTime));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+
+    private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            try (UnfilteredRowIterator partition1 = iter1.next())
+            {
+                try (UnfilteredRowIterator partition2 = iter2.next())
+                {
+                    assertIteratorsEqual(partition1, partition2);
+                }
+            }
+        }
+
+        assertTrue(!iter2.hasNext());
+    }
+
+    private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            assertEquals(iter1.next(), iter2.next());
+        }
+        assertTrue(!iter2.hasNext());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Fix purging semi-expired RT boundaries in reversed iterators

Posted by al...@apache.org.
Fix purging semi-expired RT boundaries in reversed iterators

patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-14672


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d496dca6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d496dca6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d496dca6

Branch: refs/heads/cassandra-3.11
Commit: d496dca6729853ece49d68c4837fed35149c95d0
Parents: 45937de
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Fri Sep 21 21:26:13 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:32:56 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 294 +++++++++++++++++++
 3 files changed, 297 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43628b2..2c2f4f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.18
+ * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
  * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
  * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index c4bc2f2..f0f5421 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -136,12 +136,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
 
     public RangeTombstoneBoundMarker createCorrespondingCloseMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(closeBound(reversed), endDeletion);
+        return new RangeTombstoneBoundMarker(closeBound(reversed), closeDeletionTime(reversed));
     }
 
     public RangeTombstoneBoundMarker createCorrespondingOpenMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion);
+        return new RangeTombstoneBoundMarker(openBound(reversed), openDeletionTime(reversed));
     }
 
     public void digest(MessageDigest digest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d496dca6/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
new file mode 100644
index 0000000..1dea7f3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterators;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ClusteringPrefix.Kind;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class PurgeFunctionTest
+{
+    private static final String KEYSPACE = "PurgeFunctionTest";
+    private static final String TABLE = "table";
+
+    private CFMetaData metadata;
+    private DecoratedKey key;
+
+    private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+    {
+        class WithoutPurgeableTombstones extends PurgeFunction
+        {
+            private WithoutPurgeableTombstones()
+            {
+                super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+            }
+
+            protected Predicate<Long> getPurgeEvaluator()
+            {
+                return time -> true;
+            }
+        }
+
+        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+    }
+
+    @Before
+    public void setUp()
+    {
+        metadata =
+            CFMetaData.Builder
+                      .create(KEYSPACE, TABLE)
+                      .addPartitionKey("pk", UTF8Type.instance)
+                      .addClusteringColumn("ck", UTF8Type.instance)
+                      .build();
+        key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+    }
+
+    @Test
+    public void testNothingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testNothingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testEverythingIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testEverythingIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+
+        assertTrue(!purged.hasNext());
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testFirstHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableASC()
+    {
+        UnfilteredPartitionIterator original = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(false
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    @Test
+    public void testSecondHalfIsPurgeableDESC()
+    {
+        UnfilteredPartitionIterator original = iter(true
+        , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+        , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+
+        UnfilteredPartitionIterator expected = iter(true
+        , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+        , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+        );
+        assertIteratorsEqual(expected, purged);
+    }
+
+    private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+    {
+        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+
+        UnfilteredRowIterator rowIter =
+            new AbstractUnfilteredRowIterator(metadata,
+                                              key,
+                                              DeletionTime.LIVE,
+                                              metadata.partitionColumns(),
+                                              Rows.EMPTY_STATIC_ROW,
+                                              isReversedOrder,
+                                              EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
+
+    private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                            long timestamp,
+                                            int localDeletionTime,
+                                            Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                             new DeletionTime(timestamp, localDeletionTime));
+    }
+
+    private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                  long closeTimestamp,
+                                                  int closeLocalDeletionTime,
+                                                  long openTimestamp,
+                                                  int openDeletionTime,
+                                                  Object clusteringValue)
+    {
+        ByteBuffer[] clusteringByteBuffers =
+            new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+
+        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
+                                                new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                new DeletionTime(openTimestamp, openDeletionTime));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+    {
+        return ((AbstractType<T>) type).decompose(value);
+    }
+
+    private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            try (UnfilteredRowIterator partition1 = iter1.next())
+            {
+                try (UnfilteredRowIterator partition2 = iter2.next())
+                {
+                    assertIteratorsEqual(partition1, partition2);
+                }
+            }
+        }
+
+        assertTrue(!iter2.hasNext());
+    }
+
+    private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+    {
+        while (iter1.hasNext())
+        {
+            assertTrue(iter2.hasNext());
+
+            assertEquals(iter1.next(), iter2.next());
+        }
+        assertTrue(!iter2.hasNext());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c34a0f52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c34a0f52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c34a0f52

Branch: refs/heads/trunk
Commit: c34a0f5207d228a2b78f6295cd4ab3e0755e56c2
Parents: 4d3f5a3 d496dca
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Tue Sep 25 17:41:10 2018 +0100
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Sep 25 17:41:10 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/RangeTombstoneBoundaryMarker.java   |   4 +-
 .../db/partitions/PurgeFunctionTest.java        | 297 +++++++++++++++++++
 3 files changed, 300 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a4fa705,2c2f4f5..20cec87
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,5 -1,5 +1,6 @@@
 -3.0.18
 +3.11.4
 +Merged from 3.0:
+  * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
   * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
   * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
   * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c34a0f52/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
index 0000000,1dea7f3..7f85aea
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
+++ b/test/unit/org/apache/cassandra/db/partitions/PurgeFunctionTest.java
@@@ -1,0 -1,294 +1,297 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.partitions;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.Iterator;
+ import java.util.function.Predicate;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.CFMetaData;
++import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ClusteringPrefix.Kind;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public final class PurgeFunctionTest
+ {
+     private static final String KEYSPACE = "PurgeFunctionTest";
+     private static final String TABLE = "table";
+ 
+     private CFMetaData metadata;
+     private DecoratedKey key;
+ 
+     private static UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, int gcBefore)
+     {
+         class WithoutPurgeableTombstones extends PurgeFunction
+         {
+             private WithoutPurgeableTombstones()
+             {
+                 super(iterator.isForThrift(), FBUtilities.nowInSeconds(), gcBefore, Integer.MAX_VALUE, false, false);
+             }
+ 
+             protected Predicate<Long> getPurgeEvaluator()
+             {
+                 return time -> true;
+             }
+         }
+ 
+         return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+     }
+ 
+     @Before
+     public void setUp()
+     {
++        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
++
+         metadata =
+             CFMetaData.Builder
+                       .create(KEYSPACE, TABLE)
+                       .addPartitionKey("pk", UTF8Type.instance)
+                       .addClusteringColumn("ck", UTF8Type.instance)
+                       .build();
+         key = Murmur3Partitioner.instance.decorateKey(bytes("key"));
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testNothingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 0);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testEverythingIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 2);
+ 
+         assertTrue(!purged.hasNext());
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testFirstHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 0L, 0, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 0L, 0, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_END_BOUND, 1L, 1, "c")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableASC()
+     {
+         UnfilteredPartitionIterator original = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(false
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     @Test
+     public void testSecondHalfIsPurgeableDESC()
+     {
+         UnfilteredPartitionIterator original = iter(true
+         , bound(Kind.INCL_END_BOUND, 0L, 0, "c")
+         , boundary(Kind.EXCL_END_INCL_START_BOUNDARY, 1L, 1, 0L, 0, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         UnfilteredPartitionIterator purged = withoutPurgeableTombstones(original, 1);
+ 
+         UnfilteredPartitionIterator expected = iter(true
+         , bound(Kind.EXCL_END_BOUND, 1L, 1, "b")
+         , bound(Kind.INCL_START_BOUND, 1L, 1, "a")
+         );
+         assertIteratorsEqual(expected, purged);
+     }
+ 
+     private UnfilteredPartitionIterator iter(boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
+ 
+         UnfilteredRowIterator rowIter =
+             new AbstractUnfilteredRowIterator(metadata,
+                                               key,
+                                               DeletionTime.LIVE,
+                                               metadata.partitionColumns(),
+                                               Rows.EMPTY_STATIC_ROW,
+                                               isReversedOrder,
+                                               EncodingStats.NO_STATS)
+         {
+             protected Unfiltered computeNext()
+             {
+                 return iterator.hasNext() ? iterator.next() : endOfData();
+             }
+         };
+ 
+         return new SingletonUnfilteredPartitionIterator(rowIter, false);
+     }
+ 
+     private RangeTombstoneBoundMarker bound(ClusteringPrefix.Kind kind,
+                                             long timestamp,
+                                             int localDeletionTime,
+                                             Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
 -        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
++        return new RangeTombstoneBoundMarker(ClusteringBound.create(kind, clusteringByteBuffers),
+                                              new DeletionTime(timestamp, localDeletionTime));
+     }
+ 
+     private RangeTombstoneBoundaryMarker boundary(ClusteringPrefix.Kind kind,
+                                                   long closeTimestamp,
+                                                   int closeLocalDeletionTime,
+                                                   long openTimestamp,
+                                                   int openDeletionTime,
+                                                   Object clusteringValue)
+     {
+         ByteBuffer[] clusteringByteBuffers =
+             new ByteBuffer[] { decompose(metadata.clusteringColumns().get(0).type, clusteringValue) };
+ 
 -        return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, clusteringByteBuffers),
++        return new RangeTombstoneBoundaryMarker(ClusteringBoundary.create(kind, clusteringByteBuffers),
+                                                 new DeletionTime(closeTimestamp, closeLocalDeletionTime),
+                                                 new DeletionTime(openTimestamp, openDeletionTime));
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredPartitionIterator iter1, UnfilteredPartitionIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             try (UnfilteredRowIterator partition1 = iter1.next())
+             {
+                 try (UnfilteredRowIterator partition2 = iter2.next())
+                 {
+                     assertIteratorsEqual(partition1, partition2);
+                 }
+             }
+         }
+ 
+         assertTrue(!iter2.hasNext());
+     }
+ 
+     private void assertIteratorsEqual(UnfilteredRowIterator iter1, UnfilteredRowIterator iter2)
+     {
+         while (iter1.hasNext())
+         {
+             assertTrue(iter2.hasNext());
+ 
+             assertEquals(iter1.next(), iter2.next());
+         }
+         assertTrue(!iter2.hasNext());
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org