You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/01/14 11:55:40 UTC
[3/4] cassandra git commit: Fix potentially returning deleted row
with range tombstones (2.1 version)
Fix potentially returning deleted row with range tombstones (2.1 version)
patch by slebresne; reviewed by thobbs for CASSANDRA-8558
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c9c47d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c9c47d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c9c47d2
Branch: refs/heads/trunk
Commit: 1c9c47d26f2da98d1a923254858bcde550122445
Parents: 19c54a5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:40:57 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:40:57 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/SelectStatement.java | 6 +--
.../apache/cassandra/db/AtomDeserializer.java | 34 +++++++++++----
.../db/columniterator/IndexedSliceReader.java | 46 ++++++++++++++++++--
.../db/columniterator/SSTableNamesIterator.java | 25 ++++++++---
.../cassandra/db/composites/CellNameType.java | 2 +-
.../cassandra/cql3/RangeDeletionTest.java | 35 +++++++++++++++
7 files changed, 129 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e070eaf..175a78a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@
* Log failed host when preparing incremental repair (CASSANDRA-8228)
* Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
Merged from 2.0:
+ * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558)
* Check for available disk space before starting a compaction (CASSANDRA-8562)
* Fix DISTINCT queries with LIMITs or paging when some partitions
contain only tombstones (CASSANDRA-8490)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 92a9579..4ef554d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -844,7 +844,7 @@ public class SelectStatement implements CQLStatement
// For composites, if there was preceding component and we're computing the end, we must change the last component
// End-Of-Component, otherwise we would be selecting only one record.
Composite prefix = builder.build();
- return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix);
+ return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix.start());
}
if (r.isSlice())
{
@@ -869,7 +869,7 @@ public class SelectStatement implements CQLStatement
throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
Composite prefix = builder.buildWith(val);
// See below for why this
- s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix);
+ s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix.start());
}
return new ArrayList<>(s);
}
@@ -887,7 +887,7 @@ public class SelectStatement implements CQLStatement
// case using the eoc would be bad, since for the random partitioner we have no guarantee that
// prefix.end() will sort after prefix (see #5240).
Composite prefix = builder.build();
- return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix);
+ return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix.start());
}
private static Composite.EOC eocForRelation(Operator op)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java
index 799ed0e..a103647 100644
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@ -42,6 +42,10 @@ public class AtomDeserializer
private final int expireBefore;
private final Descriptor.Version version;
+ // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer) if it has been
+ // read already, Integer.MIN_VALUE otherwise;
+ private int nextFlags = Integer.MIN_VALUE;
+
public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
{
this.type = type;
@@ -82,17 +86,30 @@ public class AtomDeserializer
}
/**
+ * Returns whether the next atom is a range tombstone or not.
+ *
+ * Please note that this should only be called after compareNextTo() has been called.
+ */
+ public boolean nextIsRangeTombstone() throws IOException
+ {
+ nextFlags = in.readUnsignedByte();
+ return (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
+ }
+
+ /**
* Returns the next atom.
*/
public OnDiskAtom readNext() throws IOException
{
Composite name = nameDeserializer.readNext();
assert !name.isEmpty(); // This would imply hasNext() hasn't been called
- int b = in.readUnsignedByte();
- if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
- return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
- else
- return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+
+ nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags;
+ OnDiskAtom atom = (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0
+ ? type.rangeTombstoneSerializer().deserializeBody(in, name, version)
+ : type.columnSerializer().deserializeColumnBody(in, (CellName)name, nextFlags, flag, expireBefore);
+ nextFlags = Integer.MIN_VALUE;
+ return atom;
}
/**
@@ -101,10 +118,11 @@ public class AtomDeserializer
public void skipNext() throws IOException
{
nameDeserializer.skipNext();
- int b = in.readUnsignedByte();
- if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+ nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags;
+ if ((nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
type.rangeTombstoneSerializer().skipBody(in, version);
else
- type.columnSerializer().skipColumnBody(in, b);
+ type.columnSerializer().skipColumnBody(in, nextFlags);
+ nextFlags = Integer.MIN_VALUE;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index 7012321..924e9bc 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -290,8 +290,13 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
// If we read blocks in reversed disk order, we may have columns from the previous block to handle.
// Note that prefetched keeps columns in reversed disk order.
+ // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
+ // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
+ // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
+ // can be mistakenly added this way.
if (reversed && !prefetched.isEmpty())
{
+ // Whether we've found anything to return in prefetched
boolean gotSome = false;
// Avoids some comparison when we know it's not useful
boolean inSlice = false;
@@ -303,8 +308,22 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
if (isColumnBeforeSliceStart(prefetchedCol))
{
inSlice = false;
- if (!setNextSlice())
- return false;
+
+ // As explained above, we add RT unconditionally
+ if (prefetchedCol instanceof RangeTombstone)
+ {
+ blockColumns.addLast(prefetched.poll());
+ gotSome = true;
+ continue;
+ }
+
+ // Otherwise, we either move to the next slice or, if we have none (which can happen
+ // because we unwind prefetched no matter what due to RT), we skip the cell
+ if (hasMoreSlice())
+ setNextSlice();
+ else
+ prefetched.poll();
+
}
// col is within slice, all columns
// (we go in reverse, so as soon as we are in a slice, no need to check
@@ -374,6 +393,16 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
Composite start = currentStart();
if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
{
+ // If it's a rangeTombstone, then we need to read it and include it unless it's end
+ // stops before our slice start.
+ if (deserializer.nextIsRangeTombstone())
+ {
+ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+ if (comparator.compare(rt.max, start) >= 0)
+ addColumn(rt);
+ continue;
+ }
+
if (reversed)
{
// the next slice select columns that are before the current one, so it may
@@ -451,7 +480,18 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
Composite start = currentStart();
if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
{
- deserializer.skipNext();
+ // If it's a rangeTombstone, then we need to read it and include it unless it's end
+ // stops before our slice start. Otherwise, we can skip it.
+ if (deserializer.nextIsRangeTombstone())
+ {
+ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+ if (comparator.compare(rt.max, start) >= 0)
+ addColumn(rt);
+ }
+ else
+ {
+ deserializer.skipNext();
+ }
continue;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 224b63f..221f499 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -214,16 +214,31 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
{
int cmp = deserializer.compareNextTo(nextToFetch);
- if (cmp == 0)
+ if (cmp < 0)
+ {
+ // If it's a rangeTombstone, then we need to read it and include
+ // it if it includes our target. Otherwise, we can skip it.
+ if (deserializer.nextIsRangeTombstone())
+ {
+ RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+ if (comparator.compare(rt.max, nextToFetch) >= 0)
+ result.add(rt);
+ }
+ else
+ {
+ deserializer.skipNext();
+ }
+ }
+ else if (cmp == 0)
{
nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
result.add(deserializer.readNext());
- continue;
}
-
- deserializer.skipNext();
- if (cmp > 0)
+ else
+ {
+ deserializer.skipNext();
nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CellNameType.java b/src/java/org/apache/cassandra/db/composites/CellNameType.java
index 1e87296..7b4fd36 100644
--- a/src/java/org/apache/cassandra/db/composites/CellNameType.java
+++ b/src/java/org/apache/cassandra/db/composites/CellNameType.java
@@ -197,7 +197,7 @@ public interface CellNameType extends CType
public boolean hasUnprocessed() throws IOException;
/**
- * Comparare the next name to read to the provided Composite.
+ * Compare the next name to read to the provided Composite.
* This does not consume the next name.
*/
public int compareNextTo(Composite composite) throws IOException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
new file mode 100644
index 0000000..b31d0c2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.junit.Test;
+
+public class RangeDeletionTest extends CQLTester
+{
+ @Test
+ public void testCassandra8558() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
+
+ execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 1);
+ flush();
+ execute("DELETE FROM %s WHERE a=? AND b=?", 1, 1);
+ flush();
+ assertEmpty(execute("SELECT * FROM %s WHERE a=? AND b=? AND c=?", 1, 1, 1));
+ }
+}