You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/03/23 16:19:47 UTC
[2/3] cassandra git commit: Bugs handling range tombstones in the
sstable iterators
Bugs handling range tombstones in the sstable iterators
patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13340
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a85eeefe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a85eeefe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a85eeefe
Branch: refs/heads/trunk
Commit: a85eeefe88eb036a9cd9fa85a1c8c31c2bfad78a
Parents: 3e95c5b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Mar 16 17:05:15 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Mar 23 17:17:16 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ClusteringPrefix.java | 2 +-
.../cassandra/db/UnfilteredDeserializer.java | 1 -
.../db/columniterator/SSTableIterator.java | 11 +-
.../columniterator/SSTableReversedIterator.java | 126 +++++++++++++++----
.../cql3/validation/operations/DeleteTest.java | 70 +++++++++++
6 files changed, 181 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c58fad8..728e3e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
* Address message coalescing regression (CASSANDRA-12676)
Merged from 3.0:
+ * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340)
* Fix CONTAINS filtering for null collections (CASSANDRA-13246)
* Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
* Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 340e237..1ecc92d 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -482,7 +482,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
}
if (bound.size() == nextSize)
- return nextKind.compareTo(bound.kind());
+ return Kind.compare(nextKind, bound.kind());
// We know that we'll have exited already if nextSize < bound.size
return -bound.kind().comparedToClustering;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 79b8636..b977907 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -690,6 +690,5 @@ public abstract class UnfilteredDeserializer
}
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index b3c2e94..e21bd72 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -138,7 +138,14 @@ public class SSTableIterator extends AbstractSSTableIterator
{
assert deserializer != null;
- if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0)
+ // We use a same reasoning as in handlePreSliceData regarding the strictness of the inequality below.
+ // We want to exclude deserialized unfiltered equal to end, because 1) we won't miss any rows since those
+ // woudn't be equal to a slice bound and 2) a end bound can be equal to a start bound
+ // (EXCL_END(x) == INCL_START(x) for instance) and in that case we don't want to return start bound because
+ // it's fundamentally excluded. And if the bound is a end (for a range tombstone), it means it's exactly
+ // our slice end, but in that case we will properly close the range tombstone anyway as part of our "close
+ // an open marker" code in hasNextInterna
+ if (!deserializer.hasNext() || deserializer.compareNextTo(end) >= 0)
return null;
Unfiltered next = deserializer.readNext();
@@ -281,7 +288,7 @@ public class SSTableIterator extends AbstractSSTableIterator
if (indexState.isDone()
|| indexState.currentBlockIdx() > lastBlockIdx
|| !deserializer.hasNext()
- || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) > 0))
+ || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) >= 0))
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index c74b5db..c4bcd9e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.btree.BTree;
/**
@@ -81,6 +82,11 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
protected ReusablePartitionData buffer;
protected Iterator<Unfiltered> iterator;
+ // Set in loadFromDisk () and used in setIterator to handle range tombstone extending on multiple index block. See
+ // loadFromDisk for details. Note that those are always false for non-indexed readers.
+ protected boolean skipFirstIteratedItem;
+ protected boolean skipLastIteratedItem;
+
private ReverseReader(FileDataInput file, boolean shouldCloseFile)
{
super(file, shouldCloseFile);
@@ -123,8 +129,8 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
buffer = createBuffer(1);
// Note that we can reuse that buffer between slices (we could alternatively re-read from disk
// every time, but that feels more wasteful) so we want to include everything from the beginning.
- // We can stop at the last slice end however since any following slice will be before that.
- loadFromDisk(null, slice.end(), true);
+ // We can stop at the slice end however since any following slice will be before that.
+ loadFromDisk(null, slice.end(), true, false, false);
}
setIterator(slice);
}
@@ -133,6 +139,15 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
{
assert buffer != null;
iterator = buffer.built.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true);
+
+ if (!iterator.hasNext())
+ return;
+
+ if (skipFirstIteratedItem)
+ iterator.next();
+
+ if (skipLastIteratedItem)
+ iterator = new SkipLastIterator(iterator);
}
protected boolean hasNextInternal() throws IOException
@@ -158,9 +173,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
// Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition
// is fully read, or when stopReadingDisk() returns true.
- protected void loadFromDisk(ClusteringBound start, ClusteringBound end, boolean includeFirst) throws IOException
+ protected void loadFromDisk(ClusteringBound start,
+ ClusteringBound end,
+ boolean includeFirst,
+ boolean hasPreviousBlock,
+ boolean hasNextBlock) throws IOException
{
+ // start != null means it's the block covering the beginning of the slice, so it has to be the last block for this slice.
+ assert start == null || !hasNextBlock;
+
buffer.reset();
+ skipFirstIteratedItem = false;
+ skipLastIteratedItem = false;
boolean isFirst = true;
@@ -177,16 +201,30 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
}
}
- // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block.
+ // If we have an open marker, it's either one from what we just skipped or it's one that open in the next (or
+ // one of the next) index block (if openMarker == openMarkerAtStartOfBlock).
if (openMarker != null)
{
+ // We have to feed a marker to the buffer, because that marker is likely to be close later and ImmtableBTreePartition
+ // doesn't take kindly to marker that comes without their counterpart. If that's the last block we're gonna read (for
+ // the current slice at least) it's easy because we'll want to return that open marker at the end of the data in this
+ // block anyway, so we have nothing more to do than adding it to the buffer.
+ // If it's not the last block however, in which case we know we'll have start == null, it means this marker is really
+ // open in a next block and so while we do need to add it the buffer for the reason mentioned above, we don't
+ // want to "return" it just yet, we'll wait until we reach it in the next blocks. That's why we trigger
+ // skipLastIteratedItem in that case (this is first item of the block, but we're iterating in reverse order
+ // so it will be last returned by the iterator).
ClusteringBound markerStart = start == null ? ClusteringBound.BOTTOM : start;
buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker));
+ if (hasNextBlock)
+ skipLastIteratedItem = true;
}
// Now deserialize everything until we reach our requested end (if we have one)
+ // See SSTableIterator.ForwardRead.computeNext() for why this is a strict inequality below: this is the same
+ // reasoning here.
while (deserializer.hasNext()
- && (end == null || deserializer.compareNextTo(end) <= 0)
+ && (end == null || deserializer.compareNextTo(end) < 0)
&& !stopReadingDisk())
{
Unfiltered unfiltered = deserializer.readNext();
@@ -202,9 +240,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
// If we have an open marker, we should close it before finishing
if (openMarker != null)
{
- // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block.
+ // This is the reverse problem than the one at the start of the block. Namely, if it's the first block
+ // we deserialize for the slice (the one covering the slice end basically), then it's easy, we just want
+ // to add the close marker to the buffer and return it normally.
+ // If it's note our first block (for the slice) however, it means that marker closed in a previously read
+ // block and we have already returned it. So while we should still add it to the buffer for the sake of
+ // not breaking ImmutableBTreePartition, we should skip it when returning from the iterator, hence the
+ // skipFirstIteratedItem (this is the last item of the block, but we're iterating in reverse order so it will
+ // be the first returned by the iterator).
ClusteringBound markerEnd = end == null ? ClusteringBound.TOP : end;
buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker()));
+ if (hasPreviousBlock)
+ skipFirstIteratedItem = true;
}
buffer.build();
@@ -267,13 +314,13 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
if (startIdx >= indexState.blocksCount())
startIdx = indexState.blocksCount() - 1;
- if (startIdx != indexState.currentBlockIdx())
- {
- indexState.setToBlock(startIdx);
- readCurrentBlock(true);
- }
+ // Note that even if we were already set on the proper block (which would happen if the previous slice
+ // requested ended on the same block this one start), we can't reuse it because when reading the previous
+ // slice we've only read that block from the previous slice start. Re-reading also handles
+ // skipFirstIteratedItem/skipLastIteratedItem that we would need to handle otherwise.
+ indexState.setToBlock(startIdx);
- setIterator(slice);
+ readCurrentBlock(false, startIdx != lastBlockIdx);
}
@Override
@@ -282,15 +329,14 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
if (super.hasNextInternal())
return true;
- // We have nothing more for our current block, move the previous one.
- int previousBlockIdx = indexState.currentBlockIdx() - 1;
- if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx)
+ // We have nothing more for our current block, move the next one (so the one before on disk).
+ int nextBlockIdx = indexState.currentBlockIdx() - 1;
+ if (nextBlockIdx < 0 || nextBlockIdx < lastBlockIdx)
return false;
// The slice start can be in
- indexState.setToBlock(previousBlockIdx);
- readCurrentBlock(false);
- setIterator(slice);
+ indexState.setToBlock(nextBlockIdx);
+ readCurrentBlock(true, nextBlockIdx != lastBlockIdx);
// since that new block is within the bounds we've computed in setToSlice(), we know there will
// always be something matching the slice unless we're on the lastBlockIdx (in which case there
// may or may not be results, but if there isn't, we're done for the slice).
@@ -300,33 +346,42 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
/**
* Reads the current block, the last one we've set.
*
- * @param canIncludeSliceEnd whether the block can include the slice end.
+ * @param hasPreviousBlock is whether we have already read a previous block for the current slice.
+ * @param hasNextBlock is whether we have more blocks to read for the current slice.
*/
- private void readCurrentBlock(boolean canIncludeSliceEnd) throws IOException
+ private void readCurrentBlock(boolean hasPreviousBlock, boolean hasNextBlock) throws IOException
{
if (buffer == null)
buffer = createBuffer(indexState.blocksCount());
int currentBlock = indexState.currentBlockIdx();
- boolean canIncludeSliceStart = currentBlock == lastBlockIdx;
+ // The slice start (resp. slice end) is only meaningful on the last (resp. first) block read (since again,
+ // we read blocks in reverse order).
+ boolean canIncludeSliceStart = !hasNextBlock;
+ boolean canIncludeSliceEnd = !hasPreviousBlock;
// When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can
// start at the end of a block and end at the beginning of the next one. That's not a problem per se for
// UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index
// blocks, but as we reading index block in reverse we must be careful to not read the end of the row at
// beginning of a block before we're reading the beginning of that row. So what we do is that if we detect
- // that the row starting this block is also the row ending the previous one, we skip that first result and
- // let it be read when we'll read the previous block.
+ // that the row starting this block is also the row ending the next one we're read (previous on disk), then
+ // we'll skip that first result and let it be read with the next block.
boolean includeFirst = true;
if (!sstable.descriptor.version.storeRows() && currentBlock > 0)
{
- ClusteringPrefix lastOfPrevious = indexState.index(currentBlock - 1).lastName;
+ ClusteringPrefix lastOfNext = indexState.index(currentBlock - 1).lastName;
ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName;
- includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent) != 0;
+ includeFirst = metadata().comparator.compare(lastOfNext, firstOfCurrent) != 0;
}
- loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, includeFirst);
+ loadFromDisk(canIncludeSliceStart ? slice.start() : null,
+ canIncludeSliceEnd ? slice.end() : null,
+ includeFirst,
+ hasPreviousBlock,
+ hasNextBlock);
+ setIterator(slice);
}
@Override
@@ -382,4 +437,23 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
deletionBuilder = null;
}
}
+
+ private static class SkipLastIterator extends AbstractIterator<Unfiltered>
+ {
+ private final Iterator<Unfiltered> iterator;
+
+ private SkipLastIterator(Iterator<Unfiltered> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
+ Unfiltered next = iterator.next();
+ return iterator.hasNext() ? next : endOfData();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
index 4694ffc..6edca38 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
@@ -1345,6 +1345,76 @@ public class DeleteTest extends CQLTester
assertTrue("The memtable should be empty but is not", isMemtableEmpty());
}
+ @Test
+ public void testQueryingOnRangeTombstoneBoundForward() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))");
+
+ execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0);
+
+ execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1);
+ execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1);
+
+ flush();
+
+ assertEmpty(execute("SELECT i FROM %s WHERE k = ? AND i = ?", "a", 1));
+ }
+
+ @Test
+ public void testQueryingOnRangeTombstoneBoundReverse() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))");
+
+ execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0);
+
+ execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1);
+ execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1);
+
+ flush();
+
+ assertRows(execute("SELECT i FROM %s WHERE k = ? AND i <= ? ORDER BY i DESC", "a", 1), row(0));
+ }
+
+ @Test
+ public void testReverseQueryWithRangeTombstoneOnMultipleBlocks() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text, i int, v text, PRIMARY KEY (k, i))");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 1200; i++)
+ sb.append('a');
+ String longText = sb.toString();
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 3", "a", i*2, longText);
+
+ execute("DELETE FROM %s USING TIMESTAMP 1 WHERE k = ? AND i >= ? AND i <= ?", "a", 12, 16);
+
+ flush();
+
+ execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 3, longText);
+ execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 3", "a", 11, longText);
+ execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 15, longText);
+ execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 17, longText);
+
+ flush();
+
+ assertRows(execute("SELECT i FROM %s WHERE k = ? ORDER BY i DESC", "a"),
+ row(18),
+ row(17),
+ row(16),
+ row(14),
+ row(12),
+ row(11),
+ row(10),
+ row(8),
+ row(6),
+ row(4),
+ row(3),
+ row(2),
+ row(0));
+ }
+
/**
* Test for CASSANDRA-13305
*/