You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/15 18:51:05 UTC

[2/3] cassandra git commit: Introduce unit tests for Rows, Cells, and DataResolver Fix Rows.diff complex deletion resolution

Introduce unit tests for Rows, Cells, and DataResolver
Fix Rows.diff complex deletion resolution

patch by blake; reviewed by benedict for CASSANDRA-10266


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b263af93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b263af93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b263af93

Branch: refs/heads/trunk
Commit: b263af93a2899cf12ee5f35f0518460683fdac18
Parents: d68325b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Sep 11 08:32:02 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Sep 15 17:49:45 2015 +0100

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/rows/Rows.java |  32 +-
 test/unit/org/apache/cassandra/db/CellTest.java |  48 +-
 .../apache/cassandra/db/rows/RowBuilder.java    |  85 +++
 .../org/apache/cassandra/db/rows/RowsTest.java  | 548 +++++++++++++++++++
 .../cassandra/service/DataResolverTest.java     | 225 +++++++-
 5 files changed, 927 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index c3b4a92..ea2ca06 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -80,7 +80,7 @@ public abstract class Rows
             {
                 ++columnCount;
                 ++cellCount;
-                Cells.collectStats((Cell)cd, collector);
+                Cells.collectStats((Cell) cd, collector);
             }
             else
             {
@@ -105,11 +105,13 @@ public abstract class Rows
     /**
      * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between
      * each input and {@code merged} to {@code diffListener}.
+     * <p>
+     * Note that this method doesn't only emit cells etc where there's a difference. The listener is informed
+     * of every corresponding entity between the merged and input rows, including those that are equal.
      *
+     * @param diffListener the listener to which to signal the differences between the inputs and the merged result.
      * @param merged the result of merging {@code inputs}.
      * @param inputs the inputs whose merge yielded {@code merged}.
-     * @param diffListener the listener to which to signal the differences between the inputs and the merged
-     * result.
      */
     public static void diff(RowDiffListener diffListener, Row merged, Row...inputs)
     {
@@ -179,6 +181,10 @@ public abstract class Rows
                             }
                             else
                             {
+
+                                if (!mergedData.complexDeletion().isLive() || !inputData.complexDeletion().isLive())
+                                    diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), inputData.complexDeletion());
+
                                 PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator());
                                 PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator());
                                 while (mergedCells.hasNext() && inputCells.hasNext())
@@ -221,8 +227,24 @@ public abstract class Rows
         return builder.build();
     }
 
-    // Merge rows in memtable
-    // Return the minimum timestamp delta between existing and update
+    /**
+     * Merges two rows into the given builder, mainly for merging memtable rows. In addition to reconciling the cells
+     * in each row, the liveness info, and deletion times for the row and complex columns are also merged.
+     * <p>
+     * Note that this method assumes that the provided rows can meaningfully be reconciled together. That is,
+     * that the rows share the same clustering value, and belong to the same partition.
+     *
+     * @param existing
+     * @param update
+     * @param builder the row build to which the result of the reconciliation is written.
+     * @param nowInSec the current time in seconds (which plays a role during reconciliation
+     * because deleted cells always have precedence on timestamp equality and deciding if a
+     * cell is a live or not depends on the current time due to expiring cells).
+     *
+     * @return the smallest timestamp delta between corresponding rows from existing and update. A
+     * timestamp delta being computed as the difference between the cells and DeletionTimes from {@code existing}
+     * and those in {@code existing}.
+     */
     public static long merge(Row existing,
                              Row update,
                              Row.Builder builder,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/CellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java
index e8cb1cb..5953255 100644
--- a/test/unit/org/apache/cassandra/db/CellTest.java
+++ b/test/unit/org/apache/cassandra/db/CellTest.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
 
 import junit.framework.Assert;
 import org.junit.BeforeClass;
@@ -26,6 +29,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.SchemaLoader;
@@ -37,16 +43,21 @@ public class CellTest
 {
     private static final String KEYSPACE1 = "CellTest";
     private static final String CF_STANDARD1 = "Standard1";
+    private static final String CF_COLLECTION = "Collection1";
 
-    private CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1);
+    private static final CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1);
+    private static final CFMetaData cfm2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION)
+                                                             .addPartitionKey("k", IntegerType.instance)
+                                                             .addClusteringColumn("c", IntegerType.instance)
+                                                             .addRegularColumn("v", IntegerType.instance)
+                                                             .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+                                                             .build();
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), cfm, cfm2);
     }
 
     @Test
@@ -90,6 +101,35 @@ public class CellTest
         Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 2));
     }
 
+    private static ByteBuffer bb(int i)
+    {
+        return ByteBufferUtil.bytes(i);
+    }
+
+    @Test
+    public void testComplexCellReconcile()
+    {
+        ColumnDefinition m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+        int now1 = FBUtilities.nowInSeconds();
+        long ts1 = now1*1000000;
+
+
+        Cell r1m1 = BufferCell.live(cfm2, m, ts1, bb(1), CellPath.create(bb(1)));
+        Cell r1m2 = BufferCell.live(cfm2, m, ts1, bb(2), CellPath.create(bb(2)));
+        List<Cell> cells1 = Lists.newArrayList(r1m1, r1m2);
+
+        int now2 = now1 + 1;
+        long ts2 = now2*1000000;
+        Cell r2m2 = BufferCell.live(cfm2, m, ts2, bb(1), CellPath.create(bb(2)));
+        Cell r2m3 = BufferCell.live(cfm2, m, ts2, bb(2), CellPath.create(bb(3)));
+        Cell r2m4 = BufferCell.live(cfm2, m, ts2, bb(3), CellPath.create(bb(4)));
+        List<Cell> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4);
+
+        RowBuilder builder = new RowBuilder();
+        Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder, now2 + 1);
+        Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells);
+    }
+
     private int testExpiring(String n1, String v1, long t1, int et1, String n2, String v2, Long t2, Integer et2)
     {
         if (n2 == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowBuilder.java b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
new file mode 100644
index 0000000..caa5c40
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Instrumented Builder implementation for testing the
+ * behavior of Cells and Rows static methods
+ */
+public class RowBuilder implements Row.Builder
+{
+    public List<Cell> cells = new LinkedList<>();
+    public Clustering clustering = null;
+    public LivenessInfo livenessInfo = null;
+    public DeletionTime deletionTime = null;
+    public List<Pair<ColumnDefinition, DeletionTime>> complexDeletions = new LinkedList<>();
+
+    public void addCell(Cell cell)
+    {
+        cells.add(cell);
+    }
+
+    public boolean isSorted()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void newRow(Clustering clustering)
+    {
+        assert this.clustering == null;
+        this.clustering = clustering;
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+    {
+        assert livenessInfo == null;
+        livenessInfo = info;
+    }
+
+    public void addRowDeletion(DeletionTime deletion)
+    {
+        assert deletionTime == null;
+        deletionTime = deletion;
+    }
+
+
+    public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+    {
+        complexDeletions.add(Pair.create(column, complexDeletion));
+    }
+
+    public Row build()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
new file mode 100644
index 0000000..306d687
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+public class RowsTest
+{
+    private static final String KEYSPACE = "rows_test";
+    private static final String KCVM_TABLE = "kcvm";
+    private static final CFMetaData kcvm;
+    private static final ColumnDefinition v;
+    private static final ColumnDefinition m;
+    private static final Clustering c1;
+
+    static
+    {
+        kcvm = CFMetaData.Builder.create(KEYSPACE, KCVM_TABLE)
+                                 .addPartitionKey("k", IntegerType.instance)
+                                 .addClusteringColumn("c", IntegerType.instance)
+                                 .addRegularColumn("v", IntegerType.instance)
+                                 .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+                                 .build();
+
+        v = kcvm.getColumnDefinition(new ColumnIdentifier("v", false));
+        m = kcvm.getColumnDefinition(new ColumnIdentifier("m", false));
+        c1 = kcvm.comparator.make(BigInteger.valueOf(1));
+    }
+
+    private static final ByteBuffer BB1 = ByteBufferUtil.bytes(1);
+    private static final ByteBuffer BB2 = ByteBufferUtil.bytes(2);
+    private static final ByteBuffer BB3 = ByteBufferUtil.bytes(3);
+    private static final ByteBuffer BB4 = ByteBufferUtil.bytes(4);
+
+    private static class MergedPair<T>
+    {
+        public final int idx;
+        public final T merged;
+        public final T original;
+
+        private MergedPair(int idx, T merged, T original)
+        {
+            this.idx = idx;
+            this.merged = merged;
+            this.original = original;
+        }
+
+        static <T> MergedPair<T> create(int i, T m, T o)
+        {
+            return new MergedPair<>(i, m, o);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            MergedPair<?> that = (MergedPair<?>) o;
+
+            if (idx != that.idx) return false;
+            if (merged != null ? !merged.equals(that.merged) : that.merged != null) return false;
+            return !(original != null ? !original.equals(that.original) : that.original != null);
+        }
+
+        public int hashCode()
+        {
+            int result = idx;
+            result = 31 * result + (merged != null ? merged.hashCode() : 0);
+            result = 31 * result + (original != null ? original.hashCode() : 0);
+            return result;
+        }
+
+        public String toString()
+        {
+            return "MergedPair{" +
+                   "idx=" + idx +
+                   ", merged=" + merged +
+                   ", original=" + original +
+                   '}';
+        }
+    }
+
+    private static class DiffListener implements RowDiffListener
+    {
+        int updates = 0;
+        Clustering clustering = null;
+
+        private void updateClustering(Clustering c)
+        {
+            assert clustering == null || clustering == c;
+            clustering = c;
+        }
+
+        List<MergedPair<Cell>> cells = new LinkedList<>();
+        public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+        {
+            updateClustering(clustering);
+            cells.add(MergedPair.create(i, merged, original));
+            updates++;
+        }
+
+        List<MergedPair<LivenessInfo>> liveness = new LinkedList<>();
+        public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+        {
+            updateClustering(clustering);
+            liveness.add(MergedPair.create(i, merged, original));
+            updates++;
+        }
+
+        List<MergedPair<DeletionTime>> deletions = new LinkedList<>();
+        public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+        {
+            updateClustering(clustering);
+            deletions.add(MergedPair.create(i, merged, original));
+            updates++;
+        }
+
+        Map<ColumnDefinition, List<MergedPair<DeletionTime>>> complexDeletions = new HashMap<>();
+        public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+        {
+            updateClustering(clustering);
+            if (!complexDeletions.containsKey(column)) complexDeletions.put(column, new LinkedList<>());
+            complexDeletions.get(column).add(MergedPair.create(i, merged, original));
+            updates++;
+        }
+    }
+
+    public static class StatsCollector implements PartitionStatisticsCollector
+    {
+        List<Cell> cells = new LinkedList<>();
+        public void update(Cell cell)
+        {
+            cells.add(cell);
+        }
+
+        List<LivenessInfo> liveness = new LinkedList<>();
+        public void update(LivenessInfo info)
+        {
+            liveness.add(info);
+        }
+
+        List<DeletionTime> deletions = new LinkedList<>();
+        public void update(DeletionTime deletion)
+        {
+            deletions.add(deletion);
+        }
+
+        long columnCount = -1;
+        public void updateColumnSetPerRow(long columnSetInRow)
+        {
+            assert columnCount < 0;
+            this.columnCount = columnSetInRow;
+        }
+
+        boolean hasLegacyCounterShards = false;
+        public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+        {
+            this.hasLegacyCounterShards |= hasLegacyCounterShards;
+        }
+    }
+
+    private static long secondToTs(int now)
+    {
+        return now * 1000000;
+    }
+
+    private static Row.Builder createBuilder(Clustering c, int now, ByteBuffer vVal, ByteBuffer mKey, ByteBuffer mVal)
+    {
+        long ts = secondToTs(now);
+        Row.Builder builder = BTreeRow.unsortedBuilder(now);
+        builder.newRow(c);
+        builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(kcvm, ts, now));
+        if (vVal != null)
+        {
+            builder.addCell(BufferCell.live(kcvm, v, ts, vVal));
+        }
+        if (mKey != null && mVal != null)
+        {
+            builder.addComplexDeletion(m, new DeletionTime(ts - 1, now));
+            builder.addCell(BufferCell.live(kcvm, m, ts, mVal, CellPath.create(mKey)));
+        }
+
+        return builder;
+    }
+
+    @Test
+    public void copy()
+    {
+        int now = FBUtilities.nowInSeconds();
+        long ts = secondToTs(now);
+        Row.Builder originalBuilder = BTreeRow.unsortedBuilder(now);
+        originalBuilder.newRow(c1);
+        LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now);
+        originalBuilder.addPrimaryKeyLivenessInfo(liveness);
+        DeletionTime complexDeletion = new DeletionTime(ts-1, now);
+        originalBuilder.addComplexDeletion(m, complexDeletion);
+        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, secondToTs(now), BB1),
+                                                      BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)),
+                                                      BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2)));
+        expectedCells.forEach(originalBuilder::addCell);
+        DeletionTime rowDeletion = new DeletionTime(ts, now);
+        originalBuilder.addRowDeletion(rowDeletion);
+
+        RowBuilder builder = new RowBuilder();
+        Rows.copy(originalBuilder.build(), builder);
+
+        Assert.assertEquals(c1, builder.clustering);
+        Assert.assertEquals(liveness, builder.livenessInfo);
+        Assert.assertEquals(rowDeletion, builder.deletionTime);
+        Assert.assertEquals(Lists.newArrayList(Pair.create(m, complexDeletion)), builder.complexDeletions);
+        Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(builder.cells));
+    }
+
+    @Test
+    public void collectStats()
+    {
+        int now = FBUtilities.nowInSeconds();
+        long ts = secondToTs(now);
+        Row.Builder builder = BTreeRow.unsortedBuilder(now);
+        builder.newRow(c1);
+        LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now);
+        builder.addPrimaryKeyLivenessInfo(liveness);
+        DeletionTime complexDeletion = new DeletionTime(ts-1, now);
+        builder.addComplexDeletion(m, complexDeletion);
+        List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, ts, BB1),
+                                                      BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)),
+                                                      BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2)));
+        expectedCells.forEach(builder::addCell);
+        DeletionTime rowDeletion = new DeletionTime(ts, now);
+        builder.addRowDeletion(rowDeletion);
+
+        StatsCollector collector = new StatsCollector();
+        Rows.collectStats(builder.build(), collector);
+
+        Assert.assertEquals(Lists.newArrayList(liveness), collector.liveness);
+        Assert.assertEquals(Sets.newHashSet(rowDeletion, complexDeletion), Sets.newHashSet(collector.deletions));
+        Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(collector.cells));
+        Assert.assertEquals(2, collector.columnCount);
+        Assert.assertFalse(collector.hasLegacyCounterShards);
+    }
+
+
+    public static void addExpectedCells(Set<MergedPair<Cell>> dst, Cell merged, Cell... inputs)
+    {
+        for (int i=0; i<inputs.length; i++)
+        {
+            dst.add(MergedPair.create(i, merged, inputs[i]));
+        }
+    }
+
+    @Test
+    public void diff()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        long ts1 = secondToTs(now1);
+        Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+        r1Builder.newRow(c1);
+        LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+        r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+        DeletionTime r1ComplexDeletion = new DeletionTime(ts1-1, now1);
+        r1Builder.addComplexDeletion(m, r1ComplexDeletion);
+
+        Cell r1v = BufferCell.live(kcvm, v, ts1, BB1);
+        Cell r1m1 = BufferCell.live(kcvm, m, ts1, BB1, CellPath.create(BB1));
+        Cell r1m2 = BufferCell.live(kcvm, m, ts1, BB2, CellPath.create(BB2));
+        List<Cell> r1ExpectedCells = Lists.newArrayList(r1v, r1m1, r1m2);
+
+        r1ExpectedCells.forEach(r1Builder::addCell);
+
+        int now2 = now1 + 1;
+        long ts2 = secondToTs(now2);
+        Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+        r2Builder.newRow(c1);
+        LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+        r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+        Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+        Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+        Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+        Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+        List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+        r2ExpectedCells.forEach(r2Builder::addCell);
+        DeletionTime r2RowDeletion = new DeletionTime(ts1 - 2, now2);
+        r2Builder.addRowDeletion(r2RowDeletion);
+
+        Row r1 = r1Builder.build();
+        Row r2 = r2Builder.build();
+        Row merged = Rows.merge(r1, r2, now2 + 1);
+
+        Assert.assertEquals(r1ComplexDeletion, merged.getComplexColumnData(m).complexDeletion());
+
+        DiffListener listener = new DiffListener();
+        Rows.diff(listener, merged, r1, r2);
+
+        Assert.assertEquals(c1, listener.clustering);
+
+        // check cells
+        Set<MergedPair<Cell>> expectedCells = Sets.newHashSet();
+        addExpectedCells(expectedCells, r2v,  r1v,  r2v);     // v
+        addExpectedCells(expectedCells, r1m1, r1m1, null);   // m[1]
+        addExpectedCells(expectedCells, r2m2, r1m2, r2m2);   // m[2]
+        addExpectedCells(expectedCells, r2m3, null, r2m3);   // m[3]
+        addExpectedCells(expectedCells, r2m4, null, r2m4);   // m[4]
+
+        Assert.assertEquals(expectedCells.size(), listener.cells.size());
+        Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+        // liveness
+        List<MergedPair<LivenessInfo>> expectedLiveness = Lists.newArrayList(MergedPair.create(0, r2Liveness, r1Liveness),
+                                                                             MergedPair.create(1, r2Liveness, r2Liveness));
+        Assert.assertEquals(expectedLiveness, listener.liveness);
+
+        // deletions
+        List<MergedPair<DeletionTime>> expectedDeletions = Lists.newArrayList(MergedPair.create(0, r2RowDeletion, null),
+                                                                              MergedPair.create(1, r2RowDeletion, r2RowDeletion));
+        Assert.assertEquals(expectedDeletions, listener.deletions);
+
+        // complex deletions
+        List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r1ComplexDeletion, r1ComplexDeletion),
+                                                                                   MergedPair.create(1, r1ComplexDeletion, DeletionTime.LIVE));
+        Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+    }
+
+    /**
+     * merged row has no column data
+     */
+    @Test
+    public void diffEmptyMerged()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        long ts1 = secondToTs(now1);
+        Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+        r1Builder.newRow(c1);
+        LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+        r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+
+        // mergedData == null
+        int now2 = now1 + 1;
+        long ts2 = secondToTs(now2);
+        Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+        r2Builder.newRow(c1);
+        LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+        r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+        DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2);
+        r2Builder.addComplexDeletion(m, r2ComplexDeletion);
+        Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+        Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+        Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+        Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+        List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+        r2ExpectedCells.forEach(r2Builder::addCell);
+        DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2);
+        r2Builder.addRowDeletion(r2RowDeletion);
+
+        Row r1 = r1Builder.build();
+        Row r2 = r2Builder.build();
+
+        DiffListener listener = new DiffListener();
+        Rows.diff(listener, r1, r2);
+
+        Assert.assertEquals(c1, listener.clustering);
+
+        // check cells
+        Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, null, r2v),   // v
+                                                              MergedPair.create(0, null, r2m2),  // m[2]
+                                                              MergedPair.create(0, null, r2m3),  // m[3]
+                                                              MergedPair.create(0, null, r2m4)); // m[4]
+
+        Assert.assertEquals(expectedCells.size(), listener.cells.size());
+        Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+        // complex deletions
+        List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, null, r2ComplexDeletion));
+        Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+    }
+
+    /**
+     * input row has no column data
+     */
+    @Test
+    public void diffEmptyInput()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        long ts1 = secondToTs(now1);
+        Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+        r1Builder.newRow(c1);
+        LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+        r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+
+        // mergedData == null
+        int now2 = now1 + 1;
+        long ts2 = secondToTs(now2);
+        Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+        r2Builder.newRow(c1);
+        LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+        r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+        DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2);
+        r2Builder.addComplexDeletion(m, r2ComplexDeletion);
+        Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+        Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+        Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+        Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+        List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+        r2ExpectedCells.forEach(r2Builder::addCell);
+        DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2);
+        r2Builder.addRowDeletion(r2RowDeletion);
+
+        Row r1 = r1Builder.build();
+        Row r2 = r2Builder.build();
+
+        DiffListener listener = new DiffListener();
+        Rows.diff(listener, r2, r1);
+
+        Assert.assertEquals(c1, listener.clustering);
+
+        // check cells
+        Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, r2v, null),   // v
+                                                              MergedPair.create(0, r2m2, null),  // m[2]
+                                                              MergedPair.create(0, r2m3, null),  // m[3]
+                                                              MergedPair.create(0, r2m4, null)); // m[4]
+
+        Assert.assertEquals(expectedCells.size(), listener.cells.size());
+        Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+        // complex deletions
+        List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r2ComplexDeletion, null));
+        Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+    }
+
+    @Test
+    public void merge()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        Row.Builder existingBuilder = createBuilder(c1, now1, BB1, BB1, BB1);
+
+        int now2 = now1 + 1;
+        long ts2 = secondToTs(now2);
+
+        Cell expectedVCell = BufferCell.live(kcvm, v, ts2, BB2);
+        Cell expectedMCell = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB1));
+        DeletionTime expectedComplexDeletionTime = new DeletionTime(ts2 - 1, now2);
+
+        Row.Builder updateBuilder = createBuilder(c1, now2, null, null, null);
+        updateBuilder.addCell(expectedVCell);
+        updateBuilder.addComplexDeletion(m, expectedComplexDeletionTime);
+        updateBuilder.addCell(expectedMCell);
+
+        RowBuilder builder = new RowBuilder();
+        long td = Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now2 + 1);
+
+        Assert.assertEquals(c1, builder.clustering);
+        Assert.assertEquals(LivenessInfo.create(kcvm, ts2, now2), builder.livenessInfo);
+        Assert.assertEquals(Lists.newArrayList(Pair.create(m, new DeletionTime(ts2-1, now2))), builder.complexDeletions);
+
+        Assert.assertEquals(2, builder.cells.size());
+        Assert.assertEquals(Lists.newArrayList(expectedVCell, expectedMCell), Lists.newArrayList(builder.cells));
+        Assert.assertEquals(ts2 - secondToTs(now1), td);
+    }
+
+    @Test
+    public void mergeComplexDeletionSupersededByRowDeletion()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null);
+
+        int now2 = now1 + 1;
+        Row.Builder updateBuilder = createBuilder(c1, now2, null, BB1, BB1);
+        int now3 = now2 + 1;
+        DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3);
+        updateBuilder.addRowDeletion(expectedDeletion);
+
+        RowBuilder builder = new RowBuilder();
+        Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1);
+
+        Assert.assertEquals(expectedDeletion, builder.deletionTime);
+        Assert.assertEquals(Collections.emptyList(), builder.complexDeletions);
+        Assert.assertEquals(Collections.emptyList(), builder.cells);
+    }
+
+    /**
+     * If a row's deletion time deletes a row's liveness info, the new row should have it's
+     * liveness info set to empty
+     */
+    @Test
+    public void mergeRowDeletionSupercedesLiveness()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null);
+
+        int now2 = now1 + 1;
+        Row.Builder updateBuilder = createBuilder(c1, now2, BB1, BB1, BB1);
+        int now3 = now2 + 1;
+        DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3);
+        updateBuilder.addRowDeletion(expectedDeletion);
+
+        RowBuilder builder = new RowBuilder();
+        Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1);
+
+        Assert.assertEquals(expectedDeletion, builder.deletionTime);
+        Assert.assertEquals(LivenessInfo.EMPTY, builder.livenessInfo);
+        Assert.assertEquals(Collections.emptyList(), builder.complexDeletions);
+        Assert.assertEquals(Collections.emptyList(), builder.cells);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 0804bfb..b60a039 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -20,15 +20,22 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
 import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -36,6 +43,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.Util.assertClustering;
@@ -50,6 +58,7 @@ public class DataResolverTest
 {
     public static final String KEYSPACE1 = "DataResolverTest";
     public static final String CF_STANDARD = "Standard1";
+    public static final String CF_COLLECTION = "Collection1";
 
     // counter to generate the last byte of the respondent's address in a ReadResponse message
     private int addressSuffix = 10;
@@ -57,7 +66,10 @@ public class DataResolverTest
     private DecoratedKey dk;
     private Keyspace ks;
     private ColumnFamilyStore cfs;
+    private ColumnFamilyStore cfs2;
     private CFMetaData cfm;
+    private CFMetaData cfm2;
+    private ColumnDefinition m;
     private int nowInSec;
     private ReadCommand command;
     private MessageRecorder messageRecorder;
@@ -74,10 +86,15 @@ public class DataResolverTest
                                                   .addRegularColumn("one", AsciiType.instance)
                                                   .addRegularColumn("two", AsciiType.instance)
                                                   .build();
+
+        CFMetaData cfMetaData2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION)
+                                                   .addPartitionKey("k", ByteType.instance)
+                                                   .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+                                                   .build();
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    cfMetadata);
+                                    cfMetadata, cfMetaData2);
     }
 
     @Before
@@ -87,6 +104,10 @@ public class DataResolverTest
         ks = Keyspace.open(KEYSPACE1);
         cfs = ks.getColumnFamilyStore(CF_STANDARD);
         cfm = cfs.metadata;
+        cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
+        cfm2 = cfs2.metadata;
+        m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+
         nowInSec = FBUtilities.nowInSeconds();
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
     }
@@ -419,6 +440,200 @@ public class DataResolverTest
         assertRepairContainsColumn(msg, "1", "two", "B", 3);
     }
 
+    private static ByteBuffer bb(int b)
+    {
+        return ByteBufferUtil.bytes(b);
+    }
+
+    private Cell mapCell(int k, int v, long ts)
+    {
+        return BufferCell.live(cfm2, m, ts, bb(v), CellPath.create(bb(k)));
+    }
+
+    @Test
+    public void testResolveComplexDelete()
+    {
+        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+        long[] ts = {100, 200};
+
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+        builder.addCell(mapCell(0, 0, ts[0]));
+
+        InetAddress peer1 = peer();
+        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        builder.newRow(Clustering.EMPTY);
+        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+        builder.addComplexDeletion(m, expectedCmplxDelete);
+        Cell expectedCell = mapCell(1, 1, ts[1]);
+        builder.addCell(expectedCell);
+
+        InetAddress peer2 = peer();
+        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        {
+            Row row = Iterators.getOnlyElement(rows);
+            assertColumns(row, "m");
+            Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+            Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+        }
+
+        MessageOut<Mutation> msg;
+        msg = getSentMessage(peer1);
+        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        assertTrue(rowIter.hasNext());
+        Row row = rowIter.next();
+        assertFalse(rowIter.hasNext());
+
+        ComplexColumnData cd = row.getComplexColumnData(m);
+
+        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+        assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+        Assert.assertNull(messageRecorder.sent.get(peer2));
+    }
+
+    @Test
+    public void testResolveDeletedCollection()
+    {
+
+        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+        long[] ts = {100, 200};
+
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+        builder.addCell(mapCell(0, 0, ts[0]));
+
+        InetAddress peer1 = peer();
+        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        builder.newRow(Clustering.EMPTY);
+        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+        builder.addComplexDeletion(m, expectedCmplxDelete);
+
+        InetAddress peer2 = peer();
+        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        try(PartitionIterator data = resolver.resolve())
+        {
+            assertFalse(data.hasNext());
+        }
+
+        MessageOut<Mutation> msg;
+        msg = getSentMessage(peer1);
+        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        assertTrue(rowIter.hasNext());
+        Row row = rowIter.next();
+        assertFalse(rowIter.hasNext());
+
+        ComplexColumnData cd = row.getComplexColumnData(m);
+
+        assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
+        assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+        Assert.assertNull(messageRecorder.sent.get(peer2));
+    }
+
+    @Test
+    public void testResolveNewCollection()
+    {
+        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+        long[] ts = {100, 200};
+
+        // map column
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(Clustering.EMPTY);
+        DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
+        builder.addComplexDeletion(m, expectedCmplxDelete);
+        Cell expectedCell = mapCell(0, 0, ts[0]);
+        builder.addCell(expectedCell);
+
+        // empty map column
+        InetAddress peer1 = peer();
+        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        InetAddress peer2 = peer();
+        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
+
+        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        {
+            Row row = Iterators.getOnlyElement(rows);
+            assertColumns(row, "m");
+            ComplexColumnData cd = row.getComplexColumnData(m);
+            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+        }
+
+        Assert.assertNull(messageRecorder.sent.get(peer1));
+
+        MessageOut<Mutation> msg;
+        msg = getSentMessage(peer2);
+        Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+        assertTrue(rowIter.hasNext());
+        Row row = rowIter.next();
+        assertFalse(rowIter.hasNext());
+
+        ComplexColumnData cd = row.getComplexColumnData(m);
+
+        assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
+        assertEquals(expectedCmplxDelete, cd.complexDeletion());
+    }
+
+    @Test
+    public void testResolveNewCollectionOverwritingDeleted()
+    {
+        ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+        long[] ts = {100, 200};
+
+        // cleared map column
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+
+        InetAddress peer1 = peer();
+        resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        // newer, overwritten map column
+        builder.newRow(Clustering.EMPTY);
+        DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+        builder.addComplexDeletion(m, expectedCmplxDelete);
+        Cell expectedCell = mapCell(1, 1, ts[1]);
+        builder.addCell(expectedCell);
+
+        InetAddress peer2 = peer();
+        resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+        try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+        {
+            Row row = Iterators.getOnlyElement(rows);
+            assertColumns(row, "m");
+            ComplexColumnData cd = row.getComplexColumnData(m);
+            assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+        }
+
+        MessageOut<Mutation> msg;
+        msg = getSentMessage(peer1);
+        Row row = Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2.cfId).iterator());
+
+        ComplexColumnData cd = row.getComplexColumnData(m);
+
+        assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+        assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+        Assert.assertNull(messageRecorder.sent.get(peer2));
+    }
+
     private InetAddress peer()
     {
         try
@@ -488,10 +703,16 @@ public class DataResolverTest
         assertEquals(update.metadata().cfName, cfm.cfName);
     }
 
+
     public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator)
     {
+        return readResponseMessage(from, partitionIterator, command);
+
+    }
+    public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
+    {
         return MessageIn.create(from,
-                                ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()),
+                                ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
                                 MessagingService.current_version);