You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/15 18:51:05 UTC
[2/3] cassandra git commit: Introduce unit tests for Rows, Cells,
and DataResolver Fix Rows.diff complex deletion resolution
Introduce unit tests for Rows, Cells, and DataResolver
Fix Rows.diff complex deletion resolution
patch by blake; reviewed by benedict for CASSANDRA-10266
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b263af93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b263af93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b263af93
Branch: refs/heads/trunk
Commit: b263af93a2899cf12ee5f35f0518460683fdac18
Parents: d68325b
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Sep 11 08:32:02 2015 -0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Sep 15 17:49:45 2015 +0100
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/rows/Rows.java | 32 +-
test/unit/org/apache/cassandra/db/CellTest.java | 48 +-
.../apache/cassandra/db/rows/RowBuilder.java | 85 +++
.../org/apache/cassandra/db/rows/RowsTest.java | 548 +++++++++++++++++++
.../cassandra/service/DataResolverTest.java | 225 +++++++-
5 files changed, 927 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index c3b4a92..ea2ca06 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -80,7 +80,7 @@ public abstract class Rows
{
++columnCount;
++cellCount;
- Cells.collectStats((Cell)cd, collector);
+ Cells.collectStats((Cell) cd, collector);
}
else
{
@@ -105,11 +105,13 @@ public abstract class Rows
/**
* Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between
* each input and {@code merged} to {@code diffListener}.
+ * <p>
+ * Note that this method doesn't only emit cells etc where there's a difference. The listener is informed
+ * of every corresponding entity between the merged and input rows, including those that are equal.
*
+ * @param diffListener the listener to which to signal the differences between the inputs and the merged result.
* @param merged the result of merging {@code inputs}.
* @param inputs the inputs whose merge yielded {@code merged}.
- * @param diffListener the listener to which to signal the differences between the inputs and the merged
- * result.
*/
public static void diff(RowDiffListener diffListener, Row merged, Row...inputs)
{
@@ -179,6 +181,10 @@ public abstract class Rows
}
else
{
+
+ if (!mergedData.complexDeletion().isLive() || !inputData.complexDeletion().isLive())
+ diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), inputData.complexDeletion());
+
PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator());
PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator());
while (mergedCells.hasNext() && inputCells.hasNext())
@@ -221,8 +227,24 @@ public abstract class Rows
return builder.build();
}
- // Merge rows in memtable
- // Return the minimum timestamp delta between existing and update
+ /**
+ * Merges two rows into the given builder, mainly for merging memtable rows. In addition to reconciling the cells
+ * in each row, the liveness info, and deletion times for the row and complex columns are also merged.
+ * <p>
+ * Note that this method assumes that the provided rows can meaningfully be reconciled together. That is,
+ * that the rows share the same clustering value, and belong to the same partition.
+ *
+ * @param existing
+ * @param update
+ * @param builder the row build to which the result of the reconciliation is written.
+ * @param nowInSec the current time in seconds (which plays a role during reconciliation
+ * because deleted cells always have precedence on timestamp equality and deciding if a
+ * cell is a live or not depends on the current time due to expiring cells).
+ *
+ * @return the smallest timestamp delta between corresponding rows from existing and update. A
+ * timestamp delta being computed as the difference between the cells and DeletionTimes from {@code existing}
+ * and those in {@code existing}.
+ */
public static long merge(Row existing,
Row update,
Row.Builder builder,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/CellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java
index e8cb1cb..5953255 100644
--- a/test/unit/org/apache/cassandra/db/CellTest.java
+++ b/test/unit/org/apache/cassandra/db/CellTest.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
import junit.framework.Assert;
import org.junit.BeforeClass;
@@ -26,6 +29,9 @@ import org.junit.Test;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.SchemaLoader;
@@ -37,16 +43,21 @@ public class CellTest
{
private static final String KEYSPACE1 = "CellTest";
private static final String CF_STANDARD1 = "Standard1";
+ private static final String CF_COLLECTION = "Collection1";
- private CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1);
+ private static final CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1);
+ private static final CFMetaData cfm2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION)
+ .addPartitionKey("k", IntegerType.instance)
+ .addClusteringColumn("c", IntegerType.instance)
+ .addRegularColumn("v", IntegerType.instance)
+ .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+ .build();
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), cfm, cfm2);
}
@Test
@@ -90,6 +101,35 @@ public class CellTest
Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 2));
}
+ private static ByteBuffer bb(int i)
+ {
+ return ByteBufferUtil.bytes(i);
+ }
+
+ @Test
+ public void testComplexCellReconcile()
+ {
+ ColumnDefinition m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+ int now1 = FBUtilities.nowInSeconds();
+ long ts1 = now1*1000000;
+
+
+ Cell r1m1 = BufferCell.live(cfm2, m, ts1, bb(1), CellPath.create(bb(1)));
+ Cell r1m2 = BufferCell.live(cfm2, m, ts1, bb(2), CellPath.create(bb(2)));
+ List<Cell> cells1 = Lists.newArrayList(r1m1, r1m2);
+
+ int now2 = now1 + 1;
+ long ts2 = now2*1000000;
+ Cell r2m2 = BufferCell.live(cfm2, m, ts2, bb(1), CellPath.create(bb(2)));
+ Cell r2m3 = BufferCell.live(cfm2, m, ts2, bb(2), CellPath.create(bb(3)));
+ Cell r2m4 = BufferCell.live(cfm2, m, ts2, bb(3), CellPath.create(bb(4)));
+ List<Cell> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4);
+
+ RowBuilder builder = new RowBuilder();
+ Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder, now2 + 1);
+ Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells);
+ }
+
private int testExpiring(String n1, String v1, long t1, int et1, String n2, String v2, Long t2, Integer et2)
{
if (n2 == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowBuilder.java b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
new file mode 100644
index 0000000..caa5c40
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Instrumented Builder implementation for testing the
+ * behavior of Cells and Rows static methods
+ */
+public class RowBuilder implements Row.Builder
+{
+ public List<Cell> cells = new LinkedList<>();
+ public Clustering clustering = null;
+ public LivenessInfo livenessInfo = null;
+ public DeletionTime deletionTime = null;
+ public List<Pair<ColumnDefinition, DeletionTime>> complexDeletions = new LinkedList<>();
+
+ public void addCell(Cell cell)
+ {
+ cells.add(cell);
+ }
+
+ public boolean isSorted()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void newRow(Clustering clustering)
+ {
+ assert this.clustering == null;
+ this.clustering = clustering;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+ {
+ assert livenessInfo == null;
+ livenessInfo = info;
+ }
+
+ public void addRowDeletion(DeletionTime deletion)
+ {
+ assert deletionTime == null;
+ deletionTime = deletion;
+ }
+
+
+ public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+ {
+ complexDeletions.add(Pair.create(column, complexDeletion));
+ }
+
+ public Row build()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/db/rows/RowsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
new file mode 100644
index 0000000..306d687
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+public class RowsTest
+{
+ private static final String KEYSPACE = "rows_test";
+ private static final String KCVM_TABLE = "kcvm";
+ private static final CFMetaData kcvm;
+ private static final ColumnDefinition v;
+ private static final ColumnDefinition m;
+ private static final Clustering c1;
+
+ static
+ {
+ kcvm = CFMetaData.Builder.create(KEYSPACE, KCVM_TABLE)
+ .addPartitionKey("k", IntegerType.instance)
+ .addClusteringColumn("c", IntegerType.instance)
+ .addRegularColumn("v", IntegerType.instance)
+ .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+ .build();
+
+ v = kcvm.getColumnDefinition(new ColumnIdentifier("v", false));
+ m = kcvm.getColumnDefinition(new ColumnIdentifier("m", false));
+ c1 = kcvm.comparator.make(BigInteger.valueOf(1));
+ }
+
+ private static final ByteBuffer BB1 = ByteBufferUtil.bytes(1);
+ private static final ByteBuffer BB2 = ByteBufferUtil.bytes(2);
+ private static final ByteBuffer BB3 = ByteBufferUtil.bytes(3);
+ private static final ByteBuffer BB4 = ByteBufferUtil.bytes(4);
+
+ private static class MergedPair<T>
+ {
+ public final int idx;
+ public final T merged;
+ public final T original;
+
+ private MergedPair(int idx, T merged, T original)
+ {
+ this.idx = idx;
+ this.merged = merged;
+ this.original = original;
+ }
+
+ static <T> MergedPair<T> create(int i, T m, T o)
+ {
+ return new MergedPair<>(i, m, o);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MergedPair<?> that = (MergedPair<?>) o;
+
+ if (idx != that.idx) return false;
+ if (merged != null ? !merged.equals(that.merged) : that.merged != null) return false;
+ return !(original != null ? !original.equals(that.original) : that.original != null);
+ }
+
+ public int hashCode()
+ {
+ int result = idx;
+ result = 31 * result + (merged != null ? merged.hashCode() : 0);
+ result = 31 * result + (original != null ? original.hashCode() : 0);
+ return result;
+ }
+
+ public String toString()
+ {
+ return "MergedPair{" +
+ "idx=" + idx +
+ ", merged=" + merged +
+ ", original=" + original +
+ '}';
+ }
+ }
+
+ private static class DiffListener implements RowDiffListener
+ {
+ int updates = 0;
+ Clustering clustering = null;
+
+ private void updateClustering(Clustering c)
+ {
+ assert clustering == null || clustering == c;
+ clustering = c;
+ }
+
+ List<MergedPair<Cell>> cells = new LinkedList<>();
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ updateClustering(clustering);
+ cells.add(MergedPair.create(i, merged, original));
+ updates++;
+ }
+
+ List<MergedPair<LivenessInfo>> liveness = new LinkedList<>();
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ updateClustering(clustering);
+ liveness.add(MergedPair.create(i, merged, original));
+ updates++;
+ }
+
+ List<MergedPair<DeletionTime>> deletions = new LinkedList<>();
+ public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
+ {
+ updateClustering(clustering);
+ deletions.add(MergedPair.create(i, merged, original));
+ updates++;
+ }
+
+ Map<ColumnDefinition, List<MergedPair<DeletionTime>>> complexDeletions = new HashMap<>();
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ {
+ updateClustering(clustering);
+ if (!complexDeletions.containsKey(column)) complexDeletions.put(column, new LinkedList<>());
+ complexDeletions.get(column).add(MergedPair.create(i, merged, original));
+ updates++;
+ }
+ }
+
+ public static class StatsCollector implements PartitionStatisticsCollector
+ {
+ List<Cell> cells = new LinkedList<>();
+ public void update(Cell cell)
+ {
+ cells.add(cell);
+ }
+
+ List<LivenessInfo> liveness = new LinkedList<>();
+ public void update(LivenessInfo info)
+ {
+ liveness.add(info);
+ }
+
+ List<DeletionTime> deletions = new LinkedList<>();
+ public void update(DeletionTime deletion)
+ {
+ deletions.add(deletion);
+ }
+
+ long columnCount = -1;
+ public void updateColumnSetPerRow(long columnSetInRow)
+ {
+ assert columnCount < 0;
+ this.columnCount = columnSetInRow;
+ }
+
+ boolean hasLegacyCounterShards = false;
+ public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+ {
+ this.hasLegacyCounterShards |= hasLegacyCounterShards;
+ }
+ }
+
+ private static long secondToTs(int now)
+ {
+ return now * 1000000;
+ }
+
+ private static Row.Builder createBuilder(Clustering c, int now, ByteBuffer vVal, ByteBuffer mKey, ByteBuffer mVal)
+ {
+ long ts = secondToTs(now);
+ Row.Builder builder = BTreeRow.unsortedBuilder(now);
+ builder.newRow(c);
+ builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(kcvm, ts, now));
+ if (vVal != null)
+ {
+ builder.addCell(BufferCell.live(kcvm, v, ts, vVal));
+ }
+ if (mKey != null && mVal != null)
+ {
+ builder.addComplexDeletion(m, new DeletionTime(ts - 1, now));
+ builder.addCell(BufferCell.live(kcvm, m, ts, mVal, CellPath.create(mKey)));
+ }
+
+ return builder;
+ }
+
+ @Test
+ public void copy()
+ {
+ int now = FBUtilities.nowInSeconds();
+ long ts = secondToTs(now);
+ Row.Builder originalBuilder = BTreeRow.unsortedBuilder(now);
+ originalBuilder.newRow(c1);
+ LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now);
+ originalBuilder.addPrimaryKeyLivenessInfo(liveness);
+ DeletionTime complexDeletion = new DeletionTime(ts-1, now);
+ originalBuilder.addComplexDeletion(m, complexDeletion);
+ List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, secondToTs(now), BB1),
+ BufferCell.live(kcvm, m, secondToTs(now), BB1, CellPath.create(BB1)),
+ BufferCell.live(kcvm, m, secondToTs(now), BB2, CellPath.create(BB2)));
+ expectedCells.forEach(originalBuilder::addCell);
+ DeletionTime rowDeletion = new DeletionTime(ts, now);
+ originalBuilder.addRowDeletion(rowDeletion);
+
+ RowBuilder builder = new RowBuilder();
+ Rows.copy(originalBuilder.build(), builder);
+
+ Assert.assertEquals(c1, builder.clustering);
+ Assert.assertEquals(liveness, builder.livenessInfo);
+ Assert.assertEquals(rowDeletion, builder.deletionTime);
+ Assert.assertEquals(Lists.newArrayList(Pair.create(m, complexDeletion)), builder.complexDeletions);
+ Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(builder.cells));
+ }
+
+ @Test
+ public void collectStats()
+ {
+ int now = FBUtilities.nowInSeconds();
+ long ts = secondToTs(now);
+ Row.Builder builder = BTreeRow.unsortedBuilder(now);
+ builder.newRow(c1);
+ LivenessInfo liveness = LivenessInfo.create(kcvm, ts, now);
+ builder.addPrimaryKeyLivenessInfo(liveness);
+ DeletionTime complexDeletion = new DeletionTime(ts-1, now);
+ builder.addComplexDeletion(m, complexDeletion);
+ List<Cell> expectedCells = Lists.newArrayList(BufferCell.live(kcvm, v, ts, BB1),
+ BufferCell.live(kcvm, m, ts, BB1, CellPath.create(BB1)),
+ BufferCell.live(kcvm, m, ts, BB2, CellPath.create(BB2)));
+ expectedCells.forEach(builder::addCell);
+ DeletionTime rowDeletion = new DeletionTime(ts, now);
+ builder.addRowDeletion(rowDeletion);
+
+ StatsCollector collector = new StatsCollector();
+ Rows.collectStats(builder.build(), collector);
+
+ Assert.assertEquals(Lists.newArrayList(liveness), collector.liveness);
+ Assert.assertEquals(Sets.newHashSet(rowDeletion, complexDeletion), Sets.newHashSet(collector.deletions));
+ Assert.assertEquals(Sets.newHashSet(expectedCells), Sets.newHashSet(collector.cells));
+ Assert.assertEquals(2, collector.columnCount);
+ Assert.assertFalse(collector.hasLegacyCounterShards);
+ }
+
+
+ public static void addExpectedCells(Set<MergedPair<Cell>> dst, Cell merged, Cell... inputs)
+ {
+ for (int i=0; i<inputs.length; i++)
+ {
+ dst.add(MergedPair.create(i, merged, inputs[i]));
+ }
+ }
+
+ @Test
+ public void diff()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ long ts1 = secondToTs(now1);
+ Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+ r1Builder.newRow(c1);
+ LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+ r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+ DeletionTime r1ComplexDeletion = new DeletionTime(ts1-1, now1);
+ r1Builder.addComplexDeletion(m, r1ComplexDeletion);
+
+ Cell r1v = BufferCell.live(kcvm, v, ts1, BB1);
+ Cell r1m1 = BufferCell.live(kcvm, m, ts1, BB1, CellPath.create(BB1));
+ Cell r1m2 = BufferCell.live(kcvm, m, ts1, BB2, CellPath.create(BB2));
+ List<Cell> r1ExpectedCells = Lists.newArrayList(r1v, r1m1, r1m2);
+
+ r1ExpectedCells.forEach(r1Builder::addCell);
+
+ int now2 = now1 + 1;
+ long ts2 = secondToTs(now2);
+ Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+ r2Builder.newRow(c1);
+ LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+ r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+ Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+ Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+ Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+ Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+ List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+ r2ExpectedCells.forEach(r2Builder::addCell);
+ DeletionTime r2RowDeletion = new DeletionTime(ts1 - 2, now2);
+ r2Builder.addRowDeletion(r2RowDeletion);
+
+ Row r1 = r1Builder.build();
+ Row r2 = r2Builder.build();
+ Row merged = Rows.merge(r1, r2, now2 + 1);
+
+ Assert.assertEquals(r1ComplexDeletion, merged.getComplexColumnData(m).complexDeletion());
+
+ DiffListener listener = new DiffListener();
+ Rows.diff(listener, merged, r1, r2);
+
+ Assert.assertEquals(c1, listener.clustering);
+
+ // check cells
+ Set<MergedPair<Cell>> expectedCells = Sets.newHashSet();
+ addExpectedCells(expectedCells, r2v, r1v, r2v); // v
+ addExpectedCells(expectedCells, r1m1, r1m1, null); // m[1]
+ addExpectedCells(expectedCells, r2m2, r1m2, r2m2); // m[2]
+ addExpectedCells(expectedCells, r2m3, null, r2m3); // m[3]
+ addExpectedCells(expectedCells, r2m4, null, r2m4); // m[4]
+
+ Assert.assertEquals(expectedCells.size(), listener.cells.size());
+ Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+ // liveness
+ List<MergedPair<LivenessInfo>> expectedLiveness = Lists.newArrayList(MergedPair.create(0, r2Liveness, r1Liveness),
+ MergedPair.create(1, r2Liveness, r2Liveness));
+ Assert.assertEquals(expectedLiveness, listener.liveness);
+
+ // deletions
+ List<MergedPair<DeletionTime>> expectedDeletions = Lists.newArrayList(MergedPair.create(0, r2RowDeletion, null),
+ MergedPair.create(1, r2RowDeletion, r2RowDeletion));
+ Assert.assertEquals(expectedDeletions, listener.deletions);
+
+ // complex deletions
+ List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r1ComplexDeletion, r1ComplexDeletion),
+ MergedPair.create(1, r1ComplexDeletion, DeletionTime.LIVE));
+ Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+ }
+
+ /**
+ * merged row has no column data
+ */
+ @Test
+ public void diffEmptyMerged()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ long ts1 = secondToTs(now1);
+ Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+ r1Builder.newRow(c1);
+ LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+ r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+
+ // mergedData == null
+ int now2 = now1 + 1;
+ long ts2 = secondToTs(now2);
+ Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+ r2Builder.newRow(c1);
+ LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+ r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+ DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2);
+ r2Builder.addComplexDeletion(m, r2ComplexDeletion);
+ Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+ Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+ Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+ Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+ List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+ r2ExpectedCells.forEach(r2Builder::addCell);
+ DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2);
+ r2Builder.addRowDeletion(r2RowDeletion);
+
+ Row r1 = r1Builder.build();
+ Row r2 = r2Builder.build();
+
+ DiffListener listener = new DiffListener();
+ Rows.diff(listener, r1, r2);
+
+ Assert.assertEquals(c1, listener.clustering);
+
+ // check cells
+ Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, null, r2v), // v
+ MergedPair.create(0, null, r2m2), // m[2]
+ MergedPair.create(0, null, r2m3), // m[3]
+ MergedPair.create(0, null, r2m4)); // m[4]
+
+ Assert.assertEquals(expectedCells.size(), listener.cells.size());
+ Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+ // complex deletions
+ List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, null, r2ComplexDeletion));
+ Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+ }
+
+ /**
+ * input row has no column data
+ */
+ @Test
+ public void diffEmptyInput()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ long ts1 = secondToTs(now1);
+ Row.Builder r1Builder = BTreeRow.unsortedBuilder(now1);
+ r1Builder.newRow(c1);
+ LivenessInfo r1Liveness = LivenessInfo.create(kcvm, ts1, now1);
+ r1Builder.addPrimaryKeyLivenessInfo(r1Liveness);
+
+ // mergedData == null
+ int now2 = now1 + 1;
+ long ts2 = secondToTs(now2);
+ Row.Builder r2Builder = BTreeRow.unsortedBuilder(now2);
+ r2Builder.newRow(c1);
+ LivenessInfo r2Liveness = LivenessInfo.create(kcvm, ts2, now2);
+ r2Builder.addPrimaryKeyLivenessInfo(r2Liveness);
+ DeletionTime r2ComplexDeletion = new DeletionTime(ts2-1, now2);
+ r2Builder.addComplexDeletion(m, r2ComplexDeletion);
+ Cell r2v = BufferCell.live(kcvm, v, ts2, BB2);
+ Cell r2m2 = BufferCell.live(kcvm, m, ts2, BB1, CellPath.create(BB2));
+ Cell r2m3 = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB3));
+ Cell r2m4 = BufferCell.live(kcvm, m, ts2, BB3, CellPath.create(BB4));
+ List<Cell> r2ExpectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4);
+
+ r2ExpectedCells.forEach(r2Builder::addCell);
+ DeletionTime r2RowDeletion = new DeletionTime(ts1 - 1, now2);
+ r2Builder.addRowDeletion(r2RowDeletion);
+
+ Row r1 = r1Builder.build();
+ Row r2 = r2Builder.build();
+
+ DiffListener listener = new DiffListener();
+ Rows.diff(listener, r2, r1);
+
+ Assert.assertEquals(c1, listener.clustering);
+
+ // check cells
+ Set<MergedPair<Cell>> expectedCells = Sets.newHashSet(MergedPair.create(0, r2v, null), // v
+ MergedPair.create(0, r2m2, null), // m[2]
+ MergedPair.create(0, r2m3, null), // m[3]
+ MergedPair.create(0, r2m4, null)); // m[4]
+
+ Assert.assertEquals(expectedCells.size(), listener.cells.size());
+ Assert.assertEquals(expectedCells, Sets.newHashSet(listener.cells));
+
+ // complex deletions
+ List<MergedPair<DeletionTime>> expectedCmplxDeletions = Lists.newArrayList(MergedPair.create(0, r2ComplexDeletion, null));
+ Assert.assertEquals(ImmutableMap.builder().put(m, expectedCmplxDeletions).build(), listener.complexDeletions);
+ }
+
+ @Test
+ public void merge()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ Row.Builder existingBuilder = createBuilder(c1, now1, BB1, BB1, BB1);
+
+ int now2 = now1 + 1;
+ long ts2 = secondToTs(now2);
+
+ Cell expectedVCell = BufferCell.live(kcvm, v, ts2, BB2);
+ Cell expectedMCell = BufferCell.live(kcvm, m, ts2, BB2, CellPath.create(BB1));
+ DeletionTime expectedComplexDeletionTime = new DeletionTime(ts2 - 1, now2);
+
+ Row.Builder updateBuilder = createBuilder(c1, now2, null, null, null);
+ updateBuilder.addCell(expectedVCell);
+ updateBuilder.addComplexDeletion(m, expectedComplexDeletionTime);
+ updateBuilder.addCell(expectedMCell);
+
+ RowBuilder builder = new RowBuilder();
+ long td = Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now2 + 1);
+
+ Assert.assertEquals(c1, builder.clustering);
+ Assert.assertEquals(LivenessInfo.create(kcvm, ts2, now2), builder.livenessInfo);
+ Assert.assertEquals(Lists.newArrayList(Pair.create(m, new DeletionTime(ts2-1, now2))), builder.complexDeletions);
+
+ Assert.assertEquals(2, builder.cells.size());
+ Assert.assertEquals(Lists.newArrayList(expectedVCell, expectedMCell), Lists.newArrayList(builder.cells));
+ Assert.assertEquals(ts2 - secondToTs(now1), td);
+ }
+
+ @Test
+ public void mergeComplexDeletionSupersededByRowDeletion()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null);
+
+ int now2 = now1 + 1;
+ Row.Builder updateBuilder = createBuilder(c1, now2, null, BB1, BB1);
+ int now3 = now2 + 1;
+ DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3);
+ updateBuilder.addRowDeletion(expectedDeletion);
+
+ RowBuilder builder = new RowBuilder();
+ Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1);
+
+ Assert.assertEquals(expectedDeletion, builder.deletionTime);
+ Assert.assertEquals(Collections.emptyList(), builder.complexDeletions);
+ Assert.assertEquals(Collections.emptyList(), builder.cells);
+ }
+
+ /**
+ * If a row's deletion time deletes a row's liveness info, the new row should have it's
+ * liveness info set to empty
+ */
+ @Test
+ public void mergeRowDeletionSupercedesLiveness()
+ {
+ int now1 = FBUtilities.nowInSeconds();
+ Row.Builder existingBuilder = createBuilder(c1, now1, null, null, null);
+
+ int now2 = now1 + 1;
+ Row.Builder updateBuilder = createBuilder(c1, now2, BB1, BB1, BB1);
+ int now3 = now2 + 1;
+ DeletionTime expectedDeletion = new DeletionTime(secondToTs(now3), now3);
+ updateBuilder.addRowDeletion(expectedDeletion);
+
+ RowBuilder builder = new RowBuilder();
+ Rows.merge(existingBuilder.build(), updateBuilder.build(), builder, now3 + 1);
+
+ Assert.assertEquals(expectedDeletion, builder.deletionTime);
+ Assert.assertEquals(LivenessInfo.EMPTY, builder.livenessInfo);
+ Assert.assertEquals(Collections.emptyList(), builder.complexDeletions);
+ Assert.assertEquals(Collections.emptyList(), builder.cells);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b263af93/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 0804bfb..b60a039 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -20,15 +20,22 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
@@ -36,6 +43,7 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.Util.assertClustering;
@@ -50,6 +58,7 @@ public class DataResolverTest
{
public static final String KEYSPACE1 = "DataResolverTest";
public static final String CF_STANDARD = "Standard1";
+ public static final String CF_COLLECTION = "Collection1";
// counter to generate the last byte of the respondent's address in a ReadResponse message
private int addressSuffix = 10;
@@ -57,7 +66,10 @@ public class DataResolverTest
private DecoratedKey dk;
private Keyspace ks;
private ColumnFamilyStore cfs;
+ private ColumnFamilyStore cfs2;
private CFMetaData cfm;
+ private CFMetaData cfm2;
+ private ColumnDefinition m;
private int nowInSec;
private ReadCommand command;
private MessageRecorder messageRecorder;
@@ -74,10 +86,15 @@ public class DataResolverTest
.addRegularColumn("one", AsciiType.instance)
.addRegularColumn("two", AsciiType.instance)
.build();
+
+ CFMetaData cfMetaData2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION)
+ .addPartitionKey("k", ByteType.instance)
+ .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+ .build();
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
- cfMetadata);
+ cfMetadata, cfMetaData2);
}
@Before
@@ -87,6 +104,10 @@ public class DataResolverTest
ks = Keyspace.open(KEYSPACE1);
cfs = ks.getColumnFamilyStore(CF_STANDARD);
cfm = cfs.metadata;
+ cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
+ cfm2 = cfs2.metadata;
+ m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+
nowInSec = FBUtilities.nowInSeconds();
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
}
@@ -419,6 +440,200 @@ public class DataResolverTest
assertRepairContainsColumn(msg, "1", "two", "B", 3);
}
+ private static ByteBuffer bb(int b)
+ {
+ return ByteBufferUtil.bytes(b);
+ }
+
+ private Cell mapCell(int k, int v, long ts)
+ {
+ return BufferCell.live(cfm2, m, ts, bb(v), CellPath.create(bb(k)));
+ }
+
+ @Test
+ public void testResolveComplexDelete()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+ long[] ts = {100, 200};
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+ builder.addCell(mapCell(0, 0, ts[0]));
+
+ InetAddress peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(1, 1, ts[1]);
+ builder.addCell(expectedCell);
+
+ InetAddress peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
+ Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
+ }
+
+ MessageOut<Mutation> msg;
+ msg = getSentMessage(peer1);
+ Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(messageRecorder.sent.get(peer2));
+ }
+
+ @Test
+ public void testResolveDeletedCollection()
+ {
+
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+ long[] ts = {100, 200};
+
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+ builder.addCell(mapCell(0, 0, ts[0]));
+
+ InetAddress peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+
+ InetAddress peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve())
+ {
+ assertFalse(data.hasNext());
+ }
+
+ MessageOut<Mutation> msg;
+ msg = getSentMessage(peer1);
+ Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(messageRecorder.sent.get(peer2));
+ }
+
+ @Test
+ public void testResolveNewCollection()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+ long[] ts = {100, 200};
+
+ // map column
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(0, 0, ts[0]);
+ builder.addCell(expectedCell);
+
+ // empty map column
+ InetAddress peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ InetAddress peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
+
+ try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+
+ Assert.assertNull(messageRecorder.sent.get(peer1));
+
+ MessageOut<Mutation> msg;
+ msg = getSentMessage(peer2);
+ Iterator<Row> rowIter = msg.payload.getPartitionUpdate(cfm2.cfId).iterator();
+ assertTrue(rowIter.hasNext());
+ Row row = rowIter.next();
+ assertFalse(rowIter.hasNext());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+ }
+
+ @Test
+ public void testResolveNewCollectionOverwritingDeleted()
+ {
+ ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
+ DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+
+ long[] ts = {100, 200};
+
+ // cleared map column
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(Clustering.EMPTY);
+ builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+
+ InetAddress peer1 = peer();
+ resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ // newer, overwritten map column
+ builder.newRow(Clustering.EMPTY);
+ DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
+ builder.addComplexDeletion(m, expectedCmplxDelete);
+ Cell expectedCell = mapCell(1, 1, ts[1]);
+ builder.addCell(expectedCell);
+
+ InetAddress peer2 = peer();
+ resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+
+ try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data))
+ {
+ Row row = Iterators.getOnlyElement(rows);
+ assertColumns(row, "m");
+ ComplexColumnData cd = row.getComplexColumnData(m);
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ }
+
+ MessageOut<Mutation> msg;
+ msg = getSentMessage(peer1);
+ Row row = Iterators.getOnlyElement(msg.payload.getPartitionUpdate(cfm2.cfId).iterator());
+
+ ComplexColumnData cd = row.getComplexColumnData(m);
+
+ assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
+ assertEquals(expectedCmplxDelete, cd.complexDeletion());
+
+ Assert.assertNull(messageRecorder.sent.get(peer2));
+ }
+
private InetAddress peer()
{
try
@@ -488,10 +703,16 @@ public class DataResolverTest
assertEquals(update.metadata().cfName, cfm.cfName);
}
+
public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator)
{
+ return readResponseMessage(from, partitionIterator, command);
+
+ }
+ public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
+ {
return MessageIn.create(from,
- ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()),
+ ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()),
Collections.EMPTY_MAP,
MessagingService.Verb.REQUEST_RESPONSE,
MessagingService.current_version);