You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/02/23 14:22:05 UTC
[2/5] cassandra git commit: Legacy deserializer can create unexpected
boundary range tombstones
Legacy deserializer can create unexpected boundary range tombstones
patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13237
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab717484
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab717484
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab717484
Branch: refs/heads/cassandra-3.11
Commit: ab7174849599c62f4bef3cb719c644bae13e9321
Parents: 42977db
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 23 14:32:03 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 23 14:32:34 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/UnfilteredDeserializer.java | 343 ++++++++++---------
.../cassandra/db/rows/RangeTombstoneMarker.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 31 +-
.../cassandra/db/OldFormatDeserializerTest.java | 110 ++++++
.../cassandra/service/DataResolverTest.java | 129 ++++++-
6 files changed, 436 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e978a5c..386029e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.12
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
* Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
* Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
Merged from 2.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index a2d51e13..42a806a 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.io.IOError;
import java.util.*;
+import java.util.function.Supplier;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.PeekingIterator;
@@ -265,11 +267,23 @@ public abstract class UnfilteredDeserializer
boolean readAllAsDynamic)
{
super(metadata, in, helper);
- this.iterator = new UnfilteredIterator(partitionDeletion);
+ this.iterator = new UnfilteredIterator(metadata, partitionDeletion, helper, this::readAtom);
this.readAllAsDynamic = readAllAsDynamic;
this.lastConsumedPosition = currentPosition();
}
+ private LegacyLayout.LegacyAtom readAtom()
+ {
+ try
+ {
+ return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
public void setSkipStatic()
{
this.skipStatic = true;
@@ -317,15 +331,6 @@ public abstract class UnfilteredDeserializer
}
}
- private boolean isRow(LegacyLayout.LegacyAtom atom)
- {
- if (atom.isCell())
- return true;
-
- LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
- return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
- }
-
public int compareNextTo(Slice.Bound bound) throws IOException
{
if (!hasNext())
@@ -389,19 +394,36 @@ public abstract class UnfilteredDeserializer
// Groups atoms from the input into proper Unfiltered.
// Note: this could use guava AbstractIterator except that we want to be able to clear
// the internal state of the iterator so it's cleaner to do it ourselves.
- private class UnfilteredIterator implements PeekingIterator<Unfiltered>
+ @VisibleForTesting
+ static class UnfilteredIterator implements PeekingIterator<Unfiltered>
{
private final AtomIterator atoms;
private final LegacyLayout.CellGrouper grouper;
private final TombstoneTracker tombstoneTracker;
+ private final CFMetaData metadata;
+ private final SerializationHelper helper;
private Unfiltered next;
- private UnfilteredIterator(DeletionTime partitionDeletion)
+ UnfilteredIterator(CFMetaData metadata,
+ DeletionTime partitionDeletion,
+ SerializationHelper helper,
+ Supplier<LegacyLayout.LegacyAtom> atomReader)
{
+ this.metadata = metadata;
+ this.helper = helper;
this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
- this.atoms = new AtomIterator(tombstoneTracker);
+ this.atoms = new AtomIterator(atomReader);
+ }
+
+ private boolean isRow(LegacyLayout.LegacyAtom atom)
+ {
+ if (atom.isCell())
+ return true;
+
+ LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+ return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
}
public boolean hasNext()
@@ -478,195 +500,200 @@ public abstract class UnfilteredDeserializer
{
throw new UnsupportedOperationException();
}
- }
-
- // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
- // Note: this could use guava AbstractIterator except that we want to be able to clear
- // the internal state of the iterator so it's cleaner to do it ourselves.
- private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
- {
- private final TombstoneTracker tombstoneTracker;
- private boolean isDone;
- private LegacyLayout.LegacyAtom next;
- private AtomIterator(TombstoneTracker tombstoneTracker)
+ // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
+ // Note: this could use guava AbstractIterator except that we want to be able to clear
+ // the internal state of the iterator so it's cleaner to do it ourselves.
+ private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
{
- this.tombstoneTracker = tombstoneTracker;
- }
+ private final Supplier<LegacyLayout.LegacyAtom> atomReader;
+ private boolean isDone;
+ private LegacyLayout.LegacyAtom next;
- public boolean hasNext()
- {
- if (isDone)
- return false;
+ private AtomIterator(Supplier<LegacyLayout.LegacyAtom> atomReader)
+ {
+ this.atomReader = atomReader;
+ }
- if (next == null)
+ public boolean hasNext()
{
- next = readAtom();
+ if (isDone)
+ return false;
+
if (next == null)
{
- isDone = true;
- return false;
+ next = atomReader.get();
+ if (next == null)
+ {
+ isDone = true;
+ return false;
+ }
}
+ return true;
}
- return true;
- }
- private LegacyLayout.LegacyAtom readAtom()
- {
- try
+ public LegacyLayout.LegacyAtom next()
{
- return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+ if (!hasNext())
+ throw new UnsupportedOperationException();
+ LegacyLayout.LegacyAtom toReturn = next;
+ next = null;
+ return toReturn;
}
- catch (IOException e)
+
+ public LegacyLayout.LegacyAtom peek()
{
- throw new IOError(e);
+ if (!hasNext())
+ throw new UnsupportedOperationException();
+ return next;
}
- }
- public LegacyLayout.LegacyAtom next()
- {
- if (!hasNext())
- throw new UnsupportedOperationException();
- LegacyLayout.LegacyAtom toReturn = next;
- next = null;
- return toReturn;
- }
+ public void clearState()
+ {
+ this.next = null;
+ this.isDone = false;
+ }
- public LegacyLayout.LegacyAtom peek()
- {
- if (!hasNext())
+ public void remove()
+ {
throw new UnsupportedOperationException();
- return next;
+ }
}
- public void clearState()
+ /**
+ * Tracks which range tombstones are open when deserializing the old format.
+ */
+ private class TombstoneTracker
{
- this.next = null;
- this.isDone = false;
- }
+ private final DeletionTime partitionDeletion;
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
+ // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
+ // As we only track non-fully-shadowed ranges, the first range is necessarily the currently
+ // open tombstone (the one with the higher timestamp).
+ private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
- /**
- * Tracks which range tombstones are open when deserializing the old format.
- */
- private class TombstoneTracker
- {
- private final DeletionTime partitionDeletion;
+ public TombstoneTracker(DeletionTime partitionDeletion)
+ {
+ this.partitionDeletion = partitionDeletion;
+ this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
+ }
- // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
- // As we only track non-fully-shadowed ranges, the first range is necessarily the currently
- // open tombstone (the one with the higher timestamp).
- private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
+ /**
+ * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
+ */
+ public boolean isShadowed(LegacyLayout.LegacyAtom atom)
+ {
+ assert !hasClosingMarkerBefore(atom);
+ long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
- public TombstoneTracker(DeletionTime partitionDeletion)
- {
- this.partitionDeletion = partitionDeletion;
- this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
- }
+ if (partitionDeletion.deletes(timestamp))
+ return true;
- /**
- * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
- */
- public boolean isShadowed(LegacyLayout.LegacyAtom atom)
- {
- assert !hasClosingMarkerBefore(atom);
- long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
+ SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
+ return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
+ }
- if (partitionDeletion.deletes(timestamp))
- return true;
+ /**
+ * Whether the currently open marker closes stricly before the provided row/RT.
+ */
+ public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
+ {
+ return !openTombstones.isEmpty()
+ && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
+ }
- SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
- return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
- }
+ /**
+ * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
+ */
+ public Unfiltered popClosingMarker()
+ {
+ assert !openTombstones.isEmpty();
- /**
- * Whether the currently open marker closes stricly before the provided row/RT.
- */
- public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
- {
- return !openTombstones.isEmpty()
- && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
- }
+ Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+ LegacyLayout.LegacyRangeTombstone first = iter.next();
+ iter.remove();
- /**
- * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
- */
- public Unfiltered popClosingMarker()
- {
- assert !openTombstones.isEmpty();
+ // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
+ // next tombstone
+ if (!iter.hasNext())
+ return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
- Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
- LegacyLayout.LegacyRangeTombstone first = iter.next();
- iter.remove();
+ LegacyLayout.LegacyRangeTombstone next = iter.next();
+ return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
+ }
- // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
- // next tombstone
- if (!iter.hasNext())
- return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
+ /**
+ * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
+ * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
+ * or even null (if the new tombstone start is supersedes by the currently open tombstone).
+ *
+ * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
+ * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
+ */
+ public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
+ {
+ if (openTombstones.isEmpty())
+ {
+ openTombstones.add(tombstone);
+ return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+ }
- LegacyLayout.LegacyRangeTombstone next = iter.next();
- return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
- }
+ // Add the new tombstone, and then check if it changes the currently open deletion or not.
+ // Note: we grab the first tombstone (which represents the currently open deletion time) before adding
+ // because add() can remove that first.
+ Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+ LegacyLayout.LegacyRangeTombstone first = iter.next();
- /**
- * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
- * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
- * or even null (if the new tombston start is supersedes by the currently open tombstone).
- *
- * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
- * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
- */
- public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
- {
- if (openTombstones.isEmpty())
- {
- openTombstones.add(tombstone);
- return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+ add(tombstone);
+
+ // If the newly opened tombstone superseds the currently open one, we have to produce a boundary to change
+ // the currently open deletion time, otherwise we have nothing to do.
+ return tombstone.deletionTime.supersedes(first.deletionTime)
+ ? RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime)
+ : null;
}
- Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
- LegacyLayout.LegacyRangeTombstone first = iter.next();
- if (tombstone.deletionTime.supersedes(first.deletionTime))
+ /**
+ * Adds a new tombstone to openTombstones, removing anything that would be shadowed by this new tombstone.
+ */
+ private void add(LegacyLayout.LegacyRangeTombstone tombstone)
{
- // We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open
- // one and open the new one. We should also add the tombstone, but if it stop after the first one, we should
- // also remove that first tombstone as it won't be useful anymore.
- if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0)
- iter.remove();
+ // First, remove existing tombstone that is shadowed by this tombstone.
+ Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+ while (iter.hasNext())
+ {
+ LegacyLayout.LegacyRangeTombstone existing = iter.next();
+ // openTombstones is ordered by stop bound and the new tombstone can't be shadowing anything that
+ // stop after it.
+ if (metadata.comparator.compare(tombstone.stop.bound, existing.stop.bound) < 0)
+ break;
+
+ // Note that we remove an existing tombstone even if it is equal to the new one because in that case,
+ // either the existing strictly stops before the new one and we don't want it, or it stops exactly
+ // like the new one but we're going to inconditionally add the new one anyway.
+ if (!existing.deletionTime.supersedes(tombstone.deletionTime))
+ iter.remove();
+ }
openTombstones.add(tombstone);
- return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime);
}
- else
+
+ public boolean hasOpenTombstones()
{
- // If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we
- // just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone
- // simply extend after the first one and we'll deal with it later)
- assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) <= 0;
- openTombstones.add(tombstone);
- return null;
+ return !openTombstones.isEmpty();
}
- }
- public boolean hasOpenTombstones()
- {
- return !openTombstones.isEmpty();
- }
-
- private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open)
- {
- return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0;
- }
+ private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open)
+ {
+ return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0;
+ }
- public void clearState()
- {
- openTombstones.clear();
+ public void clearState()
+ {
+ openTombstones.clear();
+ }
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index 5771a86..1cd5fb4 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
* A marker for a range tombstone bound.
* <p>
- * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}).
+ * There is 2 types of markers: bounds (see {@link RangeTombstoneBoundMarker}) and boundaries (see {@link RangeTombstoneBoundaryMarker}).
*/
public interface RangeTombstoneMarker extends Unfiltered
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 01953e1..60cfbba 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -297,26 +297,33 @@ public class DataResolver extends ResponseResolver
// active after that point. Further whatever deletion was open or is open by this marker on the
// source, that deletion cannot supersedes the current one.
//
- // What we want to know here is if the source deletion and merged deletion was or will be equal,
- // because in that case we don't want to include any repair for the source, and otherwise we do.
+ // But while the marker deletion (before and/or after this point) cannot supersed the current
+ // deletion, we want to know if it's equal to it (both before and after), because in that case
+ // the source is up to date and we don't want to include repair.
//
- // Note further that if the marker is a boundary, as both side of that boundary will have a
- // different deletion time, only one side might be equal to the merged deletion. This means we
- // can only be in one of 2 cases:
- // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
- // it won't be from that point on.
+ // So in practice we have 2 possible case:
+ // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then
+ // it won't be from that point on unless it's a boundary and the new opened deletion time
+ // is also equal to the current deletion (note that this implies the boundary has the same
+ // closing and opening deletion time, which should generally not happen, but can due to legacy
+ // reading code not avoiding this for a while, see CASSANDRA-13237).
// 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
// it may now be (if it isn't we just have nothing to do for that marker).
- assert !currentDeletion.isLive();
+ assert !currentDeletion.isLive() : currentDeletion.toString();
if (markerToRepair[i] == null)
{
// Since there is an ongoing merged deletion, the only way we don't have an open repair for
// this source is that it had a range open with the same deletion as current and it's
- // closing it. This imply we need to open a deletion for the source from that point.
- assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
- assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
- markerToRepair[i] = marker.closeBound(isReversed).invert();
+ // closing it.
+ assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+
+ // and so unless it's a boundary whose opening deletion time is still equal to the current
+ // deletion (see comment above for why this can actually happen), we have to repair the source
+ // from that point on.
+ if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
+ markerToRepair[i] = marker.closeBound(isReversed).invert();
}
// In case 2) above, we only have something to do if the source is up-to-date after that point
else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
new file mode 100644
index 0000000..1060569
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.UnfilteredDeserializer.OldFormatDeserializer.UnfilteredIterator;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.*;
+
+public class OldFormatDeserializerTest
+{
+ @Test
+ public void testRangeTombstones() throws Exception
+ {
+ CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+ .withPartitioner(Murmur3Partitioner.instance)
+ .addPartitionKey("k", Int32Type.instance)
+ .addClusteringColumn("v", Int32Type.instance)
+ .build();
+
+ Supplier<LegacyLayout.LegacyAtom> atomSupplier = supplier(rt(0, 10, 42),
+ rt(5, 15, 42));
+
+ UnfilteredIterator iterator = new UnfilteredIterator(metadata,
+ DeletionTime.LIVE,
+ new SerializationHelper(metadata, MessagingService.current_version, SerializationHelper.Flag.LOCAL),
+ atomSupplier);
+
+ // As the deletion time are the same, we want this to produce a single range tombstone covering from 0 to 15.
+
+ assertTrue(iterator.hasNext());
+
+ Unfiltered first = iterator.next();
+ assertTrue(first.isRangeTombstoneMarker());
+ RangeTombstoneMarker start = (RangeTombstoneMarker)first;
+ assertTrue(start.isOpen(false));
+ assertFalse(start.isClose(false));
+ assertEquals(0, toInt(start.openBound(false)));
+ assertEquals(42, start.openDeletionTime(false).markedForDeleteAt());
+
+ Unfiltered second = iterator.next();
+ assertTrue(second.isRangeTombstoneMarker());
+ RangeTombstoneMarker end = (RangeTombstoneMarker)second;
+ assertTrue(end.isClose(false));
+ assertFalse(end.isOpen(false));
+ assertEquals(15, toInt(end.closeBound(false)));
+ assertEquals(42, end.closeDeletionTime(false).markedForDeleteAt());
+
+ assertFalse(iterator.hasNext());
+ }
+
+ private static int toInt(ClusteringPrefix prefix)
+ {
+ assertTrue(prefix.size() == 1);
+ return ByteBufferUtil.toInt(prefix.get(0));
+ }
+
+ private static Supplier<LegacyLayout.LegacyAtom> supplier(LegacyLayout.LegacyAtom... atoms)
+ {
+ return new Supplier<LegacyLayout.LegacyAtom>()
+ {
+ int i = 0;
+
+ public LegacyLayout.LegacyAtom get()
+ {
+ return i >= atoms.length ? null : atoms[i++];
+ }
+ };
+ }
+
+ private static LegacyLayout.LegacyAtom rt(int start, int end, int deletion)
+ {
+ return new LegacyLayout.LegacyRangeTombstone(bound(start, true), bound(end, false), new DeletionTime(deletion, FBUtilities.nowInSeconds()));
+ }
+
+ private static LegacyLayout.LegacyBound bound(int b, boolean isStart)
+ {
+ return new LegacyLayout.LegacyBound(isStart ? Slice.Bound.inclusiveStartOf(ByteBufferUtil.bytes(b)) : Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes(b)),
+ false,
+ null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index fd1e54e..2f72093 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -554,6 +554,73 @@ public class DataResolverTest
assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
}
+ /**
+ * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
+ * of the boundary are equal to the "merged" deletion.
+ * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
+ */
+ @Test
+ public void testRepairRangeTombstoneBoundary() throws UnknownHostException
+ {
+ testRepairRangeTombstoneBoundary(1, 0, 1);
+ messageRecorder.sent.clear();
+ testRepairRangeTombstoneBoundary(1, 1, 0);
+ messageRecorder.sent.clear();
+ testRepairRangeTombstoneBoundary(1, 1, 1);
+ }
+
+ /**
+ * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
+ * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
+ * thus still be sent).
+ */
+ public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
+ {
+ DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+ InetAddress peer1 = peer();
+ InetAddress peer2 = peer();
+
+ // 1st "stream"
+ RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
+ UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+ .addRangeTombstone(one_nine)
+ .buildUpdate());
+
+ // 2nd "stream" (build more manually to ensure we have the boundary we want)
+ RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
+ RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
+ RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
+ UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
+
+ resolver.preprocess(readResponseMessage(peer1, iter1));
+ resolver.preprocess(readResponseMessage(peer2, iter2));
+
+ boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
+
+ // No results, we've only reconciled tombstones.
+ try (PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ assertRepairFuture(resolver, shouldHaveRepair ? 1 : 0);
+ }
+
+ assertEquals(shouldHaveRepair? 1 : 0, messageRecorder.sent.size());
+
+ if (!shouldHaveRepair)
+ return;
+
+ MessageOut msg = getSentMessage(peer2);
+ assertRepairMetadata(msg);
+ assertRepairContainsNoColumns(msg);
+
+ RangeTombstone expected = timestamp1 != timestamp2
+ // We've repaired the 1st part
+ ? tombstone("0", true, "5", false, timestamp1, nowInSec)
+ // We've repaired the 2nd part
+ : tombstone("5", true, "9", true, timestamp1, nowInSec);
+ assertRepairContainsDeletions(msg, null, expected);
+ }
+
// Forces the start to be exclusive if the condition holds
private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
{
@@ -873,18 +940,40 @@ public class DataResolverTest
private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
{
- RangeTombstone.Bound.Kind startKind = inclusiveStart
- ? Kind.INCL_START_BOUND
- : Kind.EXCL_START_BOUND;
- RangeTombstone.Bound.Kind endKind = inclusiveEnd
- ? Kind.INCL_END_BOUND
- : Kind.EXCL_END_BOUND;
-
- RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
- RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
+ RangeTombstone.Bound startBound = rtBound(start, true, inclusiveStart);
+ RangeTombstone.Bound endBound = rtBound(end, false, inclusiveEnd);
return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
}
+ private RangeTombstone.Bound rtBound(Object value, boolean isStart, boolean inclusive)
+ {
+ RangeTombstone.Bound.Kind kind = isStart
+ ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
+ : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
+
+ return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ private RangeTombstone.Bound rtBoundary(Object value, boolean inclusiveOnEnd)
+ {
+ RangeTombstone.Bound.Kind kind = inclusiveOnEnd
+ ? Kind.INCL_END_EXCL_START_BOUNDARY
+ : Kind.EXCL_END_INCL_START_BOUNDARY;
+ return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
+ {
+ return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
+ }
+
+ private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
+ {
+ return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
+ new DeletionTime(markedForDeleteAt1, localDeletionTime1),
+ new DeletionTime(markedForDeleteAt2, localDeletionTime2));
+ }
+
private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)
{
return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(cfm, dk, timestamp, nowInSec).unfilteredIterator(), false);
@@ -909,4 +998,26 @@ public class DataResolverTest
{
return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator(), false);
}
+
+ private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
+ {
+ SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
+ Collections.addAll(s, unfiltereds);
+ final Iterator<Unfiltered> iterator = s.iterator();
+
+ UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
+ key,
+ DeletionTime.LIVE,
+ cfm.partitionColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ false,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+ };
+ return new SingletonUnfilteredPartitionIterator(rowIter, false);
+ }
}