You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2017/02/23 14:22:05 UTC

[2/5] cassandra git commit: Legacy deserializer can create unexpected boundary range tombstones

Legacy deserializer can create unexpected boundary range tombstones

patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13237


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab717484
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab717484
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab717484

Branch: refs/heads/cassandra-3.11
Commit: ab7174849599c62f4bef3cb719c644bae13e9321
Parents: 42977db
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 23 14:32:03 2017 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 23 14:32:34 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/UnfilteredDeserializer.java    | 343 ++++++++++---------
 .../cassandra/db/rows/RangeTombstoneMarker.java |   2 +-
 .../apache/cassandra/service/DataResolver.java  |  31 +-
 .../cassandra/db/OldFormatDeserializerTest.java | 110 ++++++
 .../cassandra/service/DataResolverTest.java     | 129 ++++++-
 6 files changed, 436 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e978a5c..386029e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.12
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
  * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
  * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 Merged from 2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index a2d51e13..42a806a 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.io.IOError;
 import java.util.*;
+import java.util.function.Supplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.PeekingIterator;
 
@@ -265,11 +267,23 @@ public abstract class UnfilteredDeserializer
                                       boolean readAllAsDynamic)
         {
             super(metadata, in, helper);
-            this.iterator = new UnfilteredIterator(partitionDeletion);
+            this.iterator = new UnfilteredIterator(metadata, partitionDeletion, helper, this::readAtom);
             this.readAllAsDynamic = readAllAsDynamic;
             this.lastConsumedPosition = currentPosition();
         }
 
+        private LegacyLayout.LegacyAtom readAtom()
+        {
+            try
+            {
+                return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
         public void setSkipStatic()
         {
             this.skipStatic = true;
@@ -317,15 +331,6 @@ public abstract class UnfilteredDeserializer
             }
         }
 
-        private boolean isRow(LegacyLayout.LegacyAtom atom)
-        {
-            if (atom.isCell())
-                return true;
-
-            LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
-            return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
-        }
-
         public int compareNextTo(Slice.Bound bound) throws IOException
         {
             if (!hasNext())
@@ -389,19 +394,36 @@ public abstract class UnfilteredDeserializer
         // Groups atoms from the input into proper Unfiltered.
         // Note: this could use guava AbstractIterator except that we want to be able to clear
         // the internal state of the iterator so it's cleaner to do it ourselves.
-        private class UnfilteredIterator implements PeekingIterator<Unfiltered>
+        @VisibleForTesting
+        static class UnfilteredIterator implements PeekingIterator<Unfiltered>
         {
             private final AtomIterator atoms;
             private final LegacyLayout.CellGrouper grouper;
             private final TombstoneTracker tombstoneTracker;
+            private final CFMetaData metadata;
+            private final SerializationHelper helper;
 
             private Unfiltered next;
 
-            private UnfilteredIterator(DeletionTime partitionDeletion)
+            UnfilteredIterator(CFMetaData metadata,
+                               DeletionTime partitionDeletion,
+                               SerializationHelper helper,
+                               Supplier<LegacyLayout.LegacyAtom> atomReader)
             {
+                this.metadata = metadata;
+                this.helper = helper;
                 this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
                 this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
-                this.atoms = new AtomIterator(tombstoneTracker);
+                this.atoms = new AtomIterator(atomReader);
+            }
+
+            private boolean isRow(LegacyLayout.LegacyAtom atom)
+            {
+                if (atom.isCell())
+                    return true;
+
+                LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+                return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
             }
 
             public boolean hasNext()
@@ -478,195 +500,200 @@ public abstract class UnfilteredDeserializer
             {
                 throw new UnsupportedOperationException();
             }
-        }
-
-        // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
-        // Note: this could use guava AbstractIterator except that we want to be able to clear
-        // the internal state of the iterator so it's cleaner to do it ourselves.
-        private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
-        {
-            private final TombstoneTracker tombstoneTracker;
-            private boolean isDone;
-            private LegacyLayout.LegacyAtom next;
 
-            private AtomIterator(TombstoneTracker tombstoneTracker)
+            // Wraps the input of the deserializer to provide an iterator (and skip shadowed atoms).
+            // Note: this could use guava AbstractIterator except that we want to be able to clear
+            // the internal state of the iterator so it's cleaner to do it ourselves.
+            private class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom>
             {
-                this.tombstoneTracker = tombstoneTracker;
-            }
+                private final Supplier<LegacyLayout.LegacyAtom> atomReader;
+                private boolean isDone;
+                private LegacyLayout.LegacyAtom next;
 
-            public boolean hasNext()
-            {
-                if (isDone)
-                    return false;
+                private AtomIterator(Supplier<LegacyLayout.LegacyAtom> atomReader)
+                {
+                    this.atomReader = atomReader;
+                }
 
-                if (next == null)
+                public boolean hasNext()
                 {
-                    next = readAtom();
+                    if (isDone)
+                        return false;
+
                     if (next == null)
                     {
-                        isDone = true;
-                        return false;
+                        next = atomReader.get();
+                        if (next == null)
+                        {
+                            isDone = true;
+                            return false;
+                        }
                     }
+                    return true;
                 }
-                return true;
-            }
 
-            private LegacyLayout.LegacyAtom readAtom()
-            {
-                try
+                public LegacyLayout.LegacyAtom next()
                 {
-                    return LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+                    if (!hasNext())
+                        throw new UnsupportedOperationException();
+                    LegacyLayout.LegacyAtom toReturn = next;
+                    next = null;
+                    return toReturn;
                 }
-                catch (IOException e)
+
+                public LegacyLayout.LegacyAtom peek()
                 {
-                    throw new IOError(e);
+                    if (!hasNext())
+                        throw new UnsupportedOperationException();
+                    return next;
                 }
-            }
 
-            public LegacyLayout.LegacyAtom next()
-            {
-                if (!hasNext())
-                    throw new UnsupportedOperationException();
-                LegacyLayout.LegacyAtom toReturn = next;
-                next = null;
-                return toReturn;
-            }
+                public void clearState()
+                {
+                    this.next = null;
+                    this.isDone = false;
+                }
 
-            public LegacyLayout.LegacyAtom peek()
-            {
-                if (!hasNext())
+                public void remove()
+                {
                     throw new UnsupportedOperationException();
-                return next;
+                }
             }
 
-            public void clearState()
+            /**
+             * Tracks which range tombstones are open when deserializing the old format.
+             */
+            private class TombstoneTracker
             {
-                this.next = null;
-                this.isDone = false;
-            }
+                private final DeletionTime partitionDeletion;
 
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        }
+                // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
+                // As we only track non-fully-shadowed ranges, the first range is necessarily the currently
+                // open tombstone (the one with the higher timestamp).
+                private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
 
-        /**
-         * Tracks which range tombstones are open when deserializing the old format.
-         */
-        private class TombstoneTracker
-        {
-            private final DeletionTime partitionDeletion;
+                public TombstoneTracker(DeletionTime partitionDeletion)
+                {
+                    this.partitionDeletion = partitionDeletion;
+                    this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
+                }
 
-            // Open tombstones sorted by their closing bound (i.e. first tombstone is the first to close).
-            // As we only track non-fully-shadowed ranges, the first range is necessarily the currently
-            // open tombstone (the one with the higher timestamp).
-            private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
+                /**
+                 * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
+                 */
+                public boolean isShadowed(LegacyLayout.LegacyAtom atom)
+                {
+                    assert !hasClosingMarkerBefore(atom);
+                    long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
 
-            public TombstoneTracker(DeletionTime partitionDeletion)
-            {
-                this.partitionDeletion = partitionDeletion;
-                this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound, rt2.stop.bound));
-            }
+                    if (partitionDeletion.deletes(timestamp))
+                        return true;
 
-            /**
-             * Checks if the provided atom is fully shadowed by the open tombstones of this tracker (or the partition deletion).
-             */
-            public boolean isShadowed(LegacyLayout.LegacyAtom atom)
-            {
-                assert !hasClosingMarkerBefore(atom);
-                long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
+                    SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
+                    return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
+                }
 
-                if (partitionDeletion.deletes(timestamp))
-                    return true;
+                /**
+                 * Whether the currently open marker closes stricly before the provided row/RT.
+                 */
+                public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
+                {
+                    return !openTombstones.isEmpty()
+                           && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
+                }
 
-                SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone());
-                return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp));
-            }
+                /**
+                 * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
+                 */
+                public Unfiltered popClosingMarker()
+                {
+                    assert !openTombstones.isEmpty();
 
-            /**
-             * Whether the currently open marker closes stricly before the provided row/RT.
-             */
-            public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
-            {
-                return !openTombstones.isEmpty()
-                    && metadata.comparator.compare(openTombstones.first().stop.bound, atom.clustering()) < 0;
-            }
+                    Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+                    LegacyLayout.LegacyRangeTombstone first = iter.next();
+                    iter.remove();
 
-            /**
-             * Returns the unfiltered corresponding to closing the currently open marker (and update the tracker accordingly).
-             */
-            public Unfiltered popClosingMarker()
-            {
-                assert !openTombstones.isEmpty();
+                    // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
+                    // next tombstone
+                    if (!iter.hasNext())
+                        return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
 
-                Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
-                LegacyLayout.LegacyRangeTombstone first = iter.next();
-                iter.remove();
+                    LegacyLayout.LegacyRangeTombstone next = iter.next();
+                    return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
+                }
 
-                // If that was the last open tombstone, we just want to close it. Otherwise, we have a boundary with the
-                // next tombstone
-                if (!iter.hasNext())
-                    return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
+                /**
+                 * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
+                 * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
+                 * or even null (if the new tombstone start is supersedes by the currently open tombstone).
+                 *
+                 * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
+                 * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
+                 */
+                public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
+                {
+                    if (openTombstones.isEmpty())
+                    {
+                        openTombstones.add(tombstone);
+                        return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+                    }
 
-                LegacyLayout.LegacyRangeTombstone next = iter.next();
-                return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound, first.stop.bound.invert(), first.deletionTime, next.deletionTime);
-            }
+                    // Add the new tombstone, and then check if it changes the currently open deletion or not.
+                    // Note: we grab the first tombstone (which represents the currently open deletion time) before adding
+                    // because add() can remove that first.
+                    Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+                    LegacyLayout.LegacyRangeTombstone first = iter.next();
 
-            /**
-             * Update the tracker given the provided newly open tombstone. This return the Unfiltered corresponding to the opening
-             * of said tombstone: this can be a simple open mark, a boundary (if there was an open tombstone superseded by this new one)
-             * or even null (if the new tombston start is supersedes by the currently open tombstone).
-             *
-             * Note that this method assume the added tombstone is not fully shadowed, i.e. that !isShadowed(tombstone). It also
-             * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
-             */
-            public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
-            {
-                if (openTombstones.isEmpty())
-                {
-                    openTombstones.add(tombstone);
-                    return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+                    add(tombstone);
+
+                    // If the newly opened tombstone superseds the currently open one, we have to produce a boundary to change
+                    // the currently open deletion time, otherwise we have nothing to do.
+                    return tombstone.deletionTime.supersedes(first.deletionTime)
+                           ? RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime)
+                           : null;
                 }
 
-                Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
-                LegacyLayout.LegacyRangeTombstone first = iter.next();
-                if (tombstone.deletionTime.supersedes(first.deletionTime))
+                /**
+                 * Adds a new tombstone to openTombstones, removing anything that would be shadowed by this new tombstone.
+                 */
+                private void add(LegacyLayout.LegacyRangeTombstone tombstone)
                 {
-                    // We're supperseding the currently open tombstone, so we should produce a boundary that close the currently open
-                    // one and open the new one. We should also add the tombstone, but if it stop after the first one, we should
-                    // also remove that first tombstone as it won't be useful anymore.
-                    if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound) >= 0)
-                        iter.remove();
+                    // First, remove existing tombstone that is shadowed by this tombstone.
+                    Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+                    while (iter.hasNext())
+                    {
 
+                        LegacyLayout.LegacyRangeTombstone existing = iter.next();
+                        // openTombstones is ordered by stop bound and the new tombstone can't be shadowing anything that
+                        // stop after it.
+                        if (metadata.comparator.compare(tombstone.stop.bound, existing.stop.bound) < 0)
+                            break;
+
+                        // Note that we remove an existing tombstone even if it is equal to the new one because in that case,
+                        // either the existing strictly stops before the new one and we don't want it, or it stops exactly
+                        // like the new one but we're going to inconditionally add the new one anyway.
+                        if (!existing.deletionTime.supersedes(tombstone.deletionTime))
+                            iter.remove();
+                    }
                     openTombstones.add(tombstone);
-                    return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(), tombstone.start.bound, first.deletionTime, tombstone.deletionTime);
                 }
-                else
+
+                public boolean hasOpenTombstones()
                 {
-                    // If the new tombstone don't supersedes the currently open tombstone, we don't have anything to return, we
-                    // just add the new tombstone (because we know tombstone is not fully shadowed, this imply the new tombstone
-                    // simply extend after the first one and we'll deal with it later)
-                    assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound) <= 0;
-                    openTombstones.add(tombstone);
-                    return null;
+                    return !openTombstones.isEmpty();
                 }
-            }
 
-            public boolean hasOpenTombstones()
-            {
-                return !openTombstones.isEmpty();
-            }
-
-            private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open)
-            {
-                return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0;
-            }
+                private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone open)
+                {
+                    return metadata.comparator.compare(close.stop.bound, open.start.bound) == 0;
+                }
 
-            public void clearState()
-            {
-                openTombstones.clear();
+                public void clearState()
+                {
+                    openTombstones.clear();
+                }
             }
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index 5771a86..1cd5fb4 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
 /**
  * A marker for a range tombstone bound.
  * <p>
- * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}).
+ * There is 2 types of markers: bounds (see {@link RangeTombstoneBoundMarker}) and boundaries (see {@link RangeTombstoneBoundaryMarker}).
  */
 public interface RangeTombstoneMarker extends Unfiltered
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 01953e1..60cfbba 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -297,26 +297,33 @@ public class DataResolver extends ResponseResolver
                         // active after that point. Further whatever deletion was open or is open by this marker on the
                         // source, that deletion cannot supersedes the current one.
                         //
-                        // What we want to know here is if the source deletion and merged deletion was or will be equal,
-                        // because in that case we don't want to include any repair for the source, and otherwise we do.
+                        // But while the marker deletion (before and/or after this point) cannot supersed the current
+                        // deletion, we want to know if it's equal to it (both before and after), because in that case
+                        // the source is up to date and we don't want to include repair.
                         //
-                        // Note further that if the marker is a boundary, as both side of that boundary will have a
-                        // different deletion time, only one side might be equal to the merged deletion. This means we
-                        // can only be in one of 2 cases:
-                        //   1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null), and then
-                        //      it won't be from that point on.
+                        // So in practice we have 2 possible case:
+                        //  1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then
+                        //     it won't be from that point on unless it's a boundary and the new opened deletion time
+                        //     is also equal to the current deletion (note that this implies the boundary has the same
+                        //     closing and opening deletion time, which should generally not happen, but can due to legacy
+                        //     reading code not avoiding this for a while, see CASSANDRA-13237).
                         //   2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
                         //      it may now be (if it isn't we just have nothing to do for that marker).
-                        assert !currentDeletion.isLive();
+                        assert !currentDeletion.isLive() : currentDeletion.toString();
 
                         if (markerToRepair[i] == null)
                         {
                             // Since there is an ongoing merged deletion, the only way we don't have an open repair for
                             // this source is that it had a range open with the same deletion as current and it's
-                            // closing it. This imply we need to open a deletion for the source from that point.
-                            assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed));
-                            assert !marker.isOpen(isReversed) || currentDeletion.supersedes(marker.openDeletionTime(isReversed));
-                            markerToRepair[i] = marker.closeBound(isReversed).invert();
+                            // closing it.
+                            assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
+                                 : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+
+                            // and so unless it's a boundary whose opening deletion time is still equal to the current
+                            // deletion (see comment above for why this can actually happen), we have to repair the source
+                            // from that point on.
+                            if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
+                                markerToRepair[i] = marker.closeBound(isReversed).invert();
                         }
                         // In case 2) above, we only have something to do if the source is up-to-date after that point
                         else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
new file mode 100644
index 0000000..1060569
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/OldFormatDeserializerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.UnfilteredDeserializer.OldFormatDeserializer.UnfilteredIterator;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.*;
+
+public class OldFormatDeserializerTest
+{
+    @Test
+    public void testRangeTombstones() throws Exception
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                .withPartitioner(Murmur3Partitioner.instance)
+                                                .addPartitionKey("k", Int32Type.instance)
+                                                .addClusteringColumn("v", Int32Type.instance)
+                                                .build();
+
+        Supplier<LegacyLayout.LegacyAtom> atomSupplier = supplier(rt(0, 10, 42),
+                                                                  rt(5, 15, 42));
+
+        UnfilteredIterator iterator = new UnfilteredIterator(metadata,
+                                                             DeletionTime.LIVE,
+                                                             new SerializationHelper(metadata, MessagingService.current_version, SerializationHelper.Flag.LOCAL),
+                                                             atomSupplier);
+
+        // As the deletion time are the same, we want this to produce a single range tombstone covering from 0 to 15.
+
+        assertTrue(iterator.hasNext());
+
+        Unfiltered first = iterator.next();
+        assertTrue(first.isRangeTombstoneMarker());
+        RangeTombstoneMarker start = (RangeTombstoneMarker)first;
+        assertTrue(start.isOpen(false));
+        assertFalse(start.isClose(false));
+        assertEquals(0, toInt(start.openBound(false)));
+        assertEquals(42, start.openDeletionTime(false).markedForDeleteAt());
+
+        Unfiltered second = iterator.next();
+        assertTrue(second.isRangeTombstoneMarker());
+        RangeTombstoneMarker end = (RangeTombstoneMarker)second;
+        assertTrue(end.isClose(false));
+        assertFalse(end.isOpen(false));
+        assertEquals(15, toInt(end.closeBound(false)));
+        assertEquals(42, end.closeDeletionTime(false).markedForDeleteAt());
+
+         assertFalse(iterator.hasNext());
+    }
+
+    private static int toInt(ClusteringPrefix prefix)
+    {
+        assertTrue(prefix.size() == 1);
+        return ByteBufferUtil.toInt(prefix.get(0));
+    }
+
+    private static Supplier<LegacyLayout.LegacyAtom> supplier(LegacyLayout.LegacyAtom... atoms)
+    {
+        return new Supplier<LegacyLayout.LegacyAtom>()
+        {
+            int i = 0;
+
+            public LegacyLayout.LegacyAtom get()
+            {
+                return i >= atoms.length ? null : atoms[i++];
+            }
+        };
+    }
+
+    private static LegacyLayout.LegacyAtom rt(int start, int end, int deletion)
+    {
+        return new LegacyLayout.LegacyRangeTombstone(bound(start, true), bound(end, false), new DeletionTime(deletion, FBUtilities.nowInSeconds()));
+    }
+
+    private static LegacyLayout.LegacyBound bound(int b, boolean isStart)
+    {
+        return new LegacyLayout.LegacyBound(isStart ? Slice.Bound.inclusiveStartOf(ByteBufferUtil.bytes(b)) : Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes(b)),
+                                            false,
+                                            null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab717484/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index fd1e54e..2f72093 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -554,6 +554,73 @@ public class DataResolverTest
         assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
     }
 
+    /**
+     * Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
+     * of the boundary are equal to the "merged" deletion.
+     * This is a test for CASSANDRA-13237 to make sure we handle this case properly.
+     */
+    @Test
+    public void testRepairRangeTombstoneBoundary() throws UnknownHostException
+    {
+        testRepairRangeTombstoneBoundary(1, 0, 1);
+        messageRecorder.sent.clear();
+        testRepairRangeTombstoneBoundary(1, 1, 0);
+        messageRecorder.sent.clear();
+        testRepairRangeTombstoneBoundary(1, 1, 1);
+    }
+
+    /**
+     * Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
+     * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
+     * thus still be sent).
+     */
+    public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
+    {
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        InetAddress peer1 = peer();
+        InetAddress peer2 = peer();
+
+        // 1st "stream"
+        RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
+        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
+                                                 .addRangeTombstone(one_nine)
+                                                 .buildUpdate());
+
+        // 2nd "stream" (build more manually to ensure we have the boundary we want)
+        RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
+        RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
+        RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
+        UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
+
+        resolver.preprocess(readResponseMessage(peer1, iter1));
+        resolver.preprocess(readResponseMessage(peer2, iter2));
+
+        boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
+
+        // No results, we've only reconciled tombstones.
+        try (PartitionIterator data = resolver.resolve())
+        {
+            assertFalse(data.hasNext());
+            assertRepairFuture(resolver, shouldHaveRepair ? 1 : 0);
+        }
+
+        assertEquals(shouldHaveRepair? 1 : 0, messageRecorder.sent.size());
+
+        if (!shouldHaveRepair)
+            return;
+
+        MessageOut msg = getSentMessage(peer2);
+        assertRepairMetadata(msg);
+        assertRepairContainsNoColumns(msg);
+
+        RangeTombstone expected = timestamp1 != timestamp2
+                                  // We've repaired the 1st part
+                                  ? tombstone("0", true, "5", false, timestamp1, nowInSec)
+                                  // We've repaired the 2nd part
+                                  : tombstone("5", true, "9", true, timestamp1, nowInSec);
+        assertRepairContainsDeletions(msg, null, expected);
+    }
+
     // Forces the start to be exclusive if the condition holds
     private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
     {
@@ -873,18 +940,40 @@ public class DataResolverTest
 
     private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
     {
-        RangeTombstone.Bound.Kind startKind = inclusiveStart
-                                            ? Kind.INCL_START_BOUND
-                                            : Kind.EXCL_START_BOUND;
-        RangeTombstone.Bound.Kind endKind = inclusiveEnd
-                                          ? Kind.INCL_END_BOUND
-                                          : Kind.EXCL_END_BOUND;
-
-        RangeTombstone.Bound startBound = new RangeTombstone.Bound(startKind, cfm.comparator.make(start).getRawValues());
-        RangeTombstone.Bound endBound = new RangeTombstone.Bound(endKind, cfm.comparator.make(end).getRawValues());
+        RangeTombstone.Bound startBound = rtBound(start, true, inclusiveStart);
+        RangeTombstone.Bound endBound = rtBound(end, false, inclusiveEnd);
         return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
     }
 
+    private RangeTombstone.Bound rtBound(Object value, boolean isStart, boolean inclusive)
+    {
+        RangeTombstone.Bound.Kind kind = isStart
+                                         ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
+                                         : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
+
+        return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues());
+    }
+
+    private RangeTombstone.Bound rtBoundary(Object value, boolean inclusiveOnEnd)
+    {
+        RangeTombstone.Bound.Kind kind = inclusiveOnEnd
+                                         ? Kind.INCL_END_EXCL_START_BOUNDARY
+                                         : Kind.EXCL_END_INCL_START_BOUNDARY;
+        return new RangeTombstone.Bound(kind, cfm.comparator.make(value).getRawValues());
+    }
+
+    private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
+    {
+        return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
+    }
+
+    private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
+    {
+        return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
+                                                new DeletionTime(markedForDeleteAt1, localDeletionTime1),
+                                                new DeletionTime(markedForDeleteAt2, localDeletionTime2));
+    }
+
     private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec)
     {
         return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(cfm, dk, timestamp, nowInSec).unfilteredIterator(), false);
@@ -909,4 +998,26 @@ public class DataResolverTest
     {
         return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator(), false);
     }
+
+    private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
+    {
+        SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
+        Collections.addAll(s, unfiltereds);
+        final Iterator<Unfiltered> iterator = s.iterator();
+
+        UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
+                                                                          key,
+                                                                          DeletionTime.LIVE,
+                                                                          cfm.partitionColumns(),
+                                                                          Rows.EMPTY_STATIC_ROW,
+                                                                          false,
+                                                                          EncodingStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return iterator.hasNext() ? iterator.next() : endOfData();
+            }
+        };
+        return new SingletonUnfilteredPartitionIterator(rowIter, false);
+    }
 }