You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:54 UTC
[02/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/StorageServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
new file mode 100644
index 0000000..9d5c324
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaMultimap;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+
+import static org.junit.Assert.assertEquals;
+
+public class StorageServiceTest
+{
+ static InetAddressAndPort aAddress;
+ static InetAddressAndPort bAddress;
+ static InetAddressAndPort cAddress;
+ static InetAddressAndPort dAddress;
+ static InetAddressAndPort eAddress;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception
+ {
+ aAddress = InetAddressAndPort.getByName("127.0.0.1");
+ bAddress = InetAddressAndPort.getByName("127.0.0.2");
+ cAddress = InetAddressAndPort.getByName("127.0.0.3");
+ dAddress = InetAddressAndPort.getByName("127.0.0.4");
+ eAddress = InetAddressAndPort.getByName("127.0.0.5");
+ }
+
+ private static final Token threeToken = new RandomPartitioner.BigIntegerToken("3");
+ private static final Token sixToken = new RandomPartitioner.BigIntegerToken("6");
+ private static final Token nineToken = new RandomPartitioner.BigIntegerToken("9");
+ private static final Token elevenToken = new RandomPartitioner.BigIntegerToken("11");
+ private static final Token oneToken = new RandomPartitioner.BigIntegerToken("1");
+
+ Range<Token> aRange = new Range<>(oneToken, threeToken);
+ Range<Token> bRange = new Range<>(threeToken, sixToken);
+ Range<Token> cRange = new Range<>(sixToken, nineToken);
+ Range<Token> dRange = new Range<>(nineToken, elevenToken);
+ Range<Token> eRange = new Range<>(elevenToken, oneToken);
+
+ @Before
+ public void setUp()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ IEndpointSnitch snitch = new AbstractEndpointSnitch()
+ {
+ public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
+ {
+ return 0;
+ }
+
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "R1";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC1";
+ }
+ };
+
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+ }
+
+ private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd)
+ {
+ return new SimpleStrategy("MoveTransientTest",
+ tmd,
+ DatabaseDescriptor.getEndpointSnitch(),
+ com.google.common.collect.ImmutableMap.of("replication_factor", "3/1"));
+ }
+
+ public static <K, C extends ReplicaCollection<? extends C>> void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b)
+ {
+ if (!a.keySet().equals(b.keySet()))
+ assertEquals(a, b);
+ for (K key : a.keySet())
+ {
+ C ac = a.get(key);
+ C bc = b.get(key);
+ if (ac.size() != bc.size())
+ assertEquals(a, b);
+ for (Replica r : ac)
+ {
+ if (!bc.contains(r))
+ assertEquals(a, b);
+ }
+ }
+ }
+
+ @Test
+ public void testGetChangedReplicasForLeaving() throws Exception
+ {
+ TokenMetadata tmd = new TokenMetadata();
+ tmd.updateNormalToken(threeToken, aAddress);
+ tmd.updateNormalToken(sixToken, bAddress);
+ tmd.updateNormalToken(nineToken, cAddress);
+ tmd.updateNormalToken(elevenToken, dAddress);
+ tmd.updateNormalToken(oneToken, eAddress);
+
+ tmd.addLeavingEndpoint(aAddress);
+
+ AbstractReplicationStrategy strat = simpleStrategy(tmd);
+
+ EndpointsByReplica result = StorageService.getChangedReplicasForLeaving("StorageServiceTest", aAddress, tmd, strat);
+ System.out.println(result);
+ EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable();
+ expectedResult.put(new Replica(aAddress, aRange, true), new Replica(cAddress, new Range<>(oneToken, sixToken), true));
+ expectedResult.put(new Replica(aAddress, aRange, true), new Replica(dAddress, new Range<>(oneToken, sixToken), false));
+ expectedResult.put(new Replica(aAddress, eRange, true), new Replica(bAddress, eRange, true));
+ expectedResult.put(new Replica(aAddress, eRange, true), new Replica(cAddress, eRange, false));
+ expectedResult.put(new Replica(aAddress, dRange, false), new Replica(bAddress, dRange, false));
+ assertMultimapEqualsIgnoreOrder(result, expectedResult.asImmutableView());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index f8567e8..cf1e06a 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
-import java.util.Collection;
-import java.util.List;
+import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Predicates;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,9 +40,13 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -49,12 +55,26 @@ public class WriteResponseHandlerTest
{
static Keyspace ks;
static ColumnFamilyStore cfs;
- static List<InetAddressAndPort> targets;
+ static EndpointsForToken targets;
+ static EndpointsForToken pending;
+
+ private static Replica full(String name)
+ {
+ try
+ {
+ return ReplicaUtils.full(InetAddressAndPort.getByName(name));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
@BeforeClass
public static void setUpClass() throws Throwable
{
SchemaLoader.loadSchema();
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
// Register peers with expected DC for NetworkTopologyStrategy.
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
metadata.clearUnsafe();
@@ -77,17 +97,12 @@ public class WriteResponseHandlerTest
return "datacenter2";
}
- public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
- {
- return null;
- }
-
- public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C replicas)
{
-
+ return replicas;
}
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
@@ -97,7 +112,7 @@ public class WriteResponseHandlerTest
}
- public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+ public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
{
return false;
}
@@ -106,8 +121,10 @@ public class WriteResponseHandlerTest
SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar"));
ks = Keyspace.open("Foo");
cfs = ks.getColumnFamilyStore("Bar");
- targets = ImmutableList.of(InetAddressAndPort.getByName("127.1.0.255"), InetAddressAndPort.getByName("127.1.0.254"), InetAddressAndPort.getByName("127.1.0.253"),
- InetAddressAndPort.getByName("127.2.0.255"), InetAddressAndPort.getByName("127.2.0.254"), InetAddressAndPort.getByName("127.2.0.253"));
+ targets = EndpointsForToken.of(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)),
+ full("127.1.0.255"), full("127.1.0.254"), full("127.1.0.253"),
+ full("127.2.0.255"), full("127.2.0.254"), full("127.2.0.253"));
+ pending = EndpointsForToken.empty(DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0)));
}
@Before
@@ -197,7 +214,6 @@ public class WriteResponseHandlerTest
@Test
public void failedIdealCLIncrementsStat() throws Throwable
{
-
AbstractWriteResponseHandler awr = createWriteResponseHandler(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.EACH_QUORUM);
//Succeed in local DC
@@ -220,16 +236,12 @@ public class WriteResponseHandlerTest
private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime)
{
- return ks.getReplicationStrategy().getWriteResponseHandler(targets, ImmutableList.of(), cl, new Runnable() {
- public void run()
- {
-
- }
- }, WriteType.SIMPLE, queryStartTime, ideal);
+ return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending),
+ null, WriteType.SIMPLE, queryStartTime, ideal);
}
private static MessageIn createDummyMessage(int target)
{
- return MessageIn.create(targets.get(target), null, null, null, 0, 0L);
+ return MessageIn.create(targets.get(target).endpoint(), null, null, null, 0, 0L);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
new file mode 100644
index 0000000..c19e65e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+public class WriteResponseHandlerTransientTest
+{
+ static Keyspace ks;
+ static ColumnFamilyStore cfs;
+
+ static final InetAddressAndPort EP1;
+ static final InetAddressAndPort EP2;
+ static final InetAddressAndPort EP3;
+ static final InetAddressAndPort EP4;
+ static final InetAddressAndPort EP5;
+ static final InetAddressAndPort EP6;
+
+ static final String DC1 = "datacenter1";
+ static final String DC2 = "datacenter2";
+ static Token dummy;
+ static
+ {
+ try
+ {
+ EP1 = InetAddressAndPort.getByName("127.1.0.1");
+ EP2 = InetAddressAndPort.getByName("127.1.0.2");
+ EP3 = InetAddressAndPort.getByName("127.1.0.3");
+ EP4 = InetAddressAndPort.getByName("127.2.0.4");
+ EP5 = InetAddressAndPort.getByName("127.2.0.5");
+ EP6 = InetAddressAndPort.getByName("127.2.0.6");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ @BeforeClass
+ public static void setupClass() throws Throwable
+ {
+ SchemaLoader.loadSchema();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+ // Register peers with expected DC for NetworkTopologyStrategy.
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+ metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.1"));
+ metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.2.0.1"));
+
+ DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch()
+ {
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return null;
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ byte[] address = endpoint.address.getAddress();
+ if (address[1] == 1)
+ return DC1;
+ else
+ return DC2;
+ }
+
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C unsortedAddress)
+ {
+ return unsortedAddress;
+ }
+
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
+ {
+ return 0;
+ }
+
+ public void gossiperStarting()
+ {
+
+ }
+
+ public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
+ {
+ return false;
+ }
+ });
+
+ DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.1.0.1"));
+ SchemaLoader.createKeyspace("ks", KeyspaceParams.nts(DC1, "3/1", DC2, "3/1"), SchemaLoader.standardCFMD("ks", "tbl"));
+ ks = Keyspace.open("ks");
+ cfs = ks.getColumnFamilyStore("tbl");
+ dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0));
+ }
+
+ @Ignore("Throws unavailable for quorum as written")
+ @Test
+ public void checkPendingReplicasAreNotFiltered()
+ {
+ EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3));
+ EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6));
+ ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue());
+
+ Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending());
+ }
+
+ private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected)
+ {
+ return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected);
+ }
+
+ private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+ {
+ return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate);
+ }
+
+ private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+ {
+ ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate);
+ Assert.assertEquals(expected.natural(), actual.natural());
+ Assert.assertEquals(expected.selected(), actual.selected());
+ }
+
+ private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints)
+ {
+ Set<InetAddressAndPort> deadSet = Sets.newHashSet(endpoints);
+ return ep -> !deadSet.contains(ep);
+ }
+
+ private static EndpointsForToken replicas(Replica... rr)
+ {
+ return EndpointsForToken.of(dummy.getToken(), rr);
+ }
+
+ @Ignore("Throws unavailable for quorum as written")
+ @Test
+ public void checkSpeculationContext()
+ {
+ EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3));
+ // in happy path, transient replica should be classified as a backup
+ assertSpeculationReplicas(expected(all,
+ replicas(full(EP1), full(EP2))),
+ replicas(full(EP1), full(EP2), trans(EP3)),
+ 2, dead());
+
+ // if one of the full replicas is dead, they should all be in the initial contacts
+ assertSpeculationReplicas(expected(all,
+ replicas(full(EP1), trans(EP3))),
+ replicas(full(EP1), full(EP2), trans(EP3)),
+ 2, dead(EP2));
+
+ // block only for 1 full replica, use transient as backups
+ assertSpeculationReplicas(expected(all,
+ replicas(full(EP1))),
+ replicas(full(EP1), full(EP2), trans(EP3)),
+ 1, dead(EP2));
+ }
+
+ @Test (expected = UnavailableException.class)
+ public void noFullReplicas()
+ {
+ getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1));
+ }
+
+ @Test (expected = UnavailableException.class)
+ public void notEnoughTransientReplicas()
+ {
+ getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
new file mode 100644
index 0000000..c6f2232
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringBoundary;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base class for testing various components which deal with read responses
+ */
+@Ignore
+public abstract class AbstractReadResponseTest
+{
+ public static final String KEYSPACE1 = "DataResolverTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_COLLECTION = "Collection1";
+
+ public static Keyspace ks;
+ public static ColumnFamilyStore cfs;
+ public static ColumnFamilyStore cfs2;
+ public static TableMetadata cfm;
+ public static TableMetadata cfm2;
+ public static ColumnMetadata m;
+
+ public static DecoratedKey dk;
+ static int nowInSec;
+
+ static final InetAddressAndPort EP1;
+ static final InetAddressAndPort EP2;
+ static final InetAddressAndPort EP3;
+
+ static
+ {
+ try
+ {
+ EP1 = InetAddressAndPort.getByName("127.0.0.1");
+ EP2 = InetAddressAndPort.getByName("127.0.0.2");
+ EP3 = InetAddressAndPort.getByName("127.0.0.3");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeClass
+ public static void setupClass() throws Throwable
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
+ TableMetadata.Builder builder1 =
+ TableMetadata.builder(KEYSPACE1, CF_STANDARD)
+ .addPartitionKeyColumn("key", BytesType.instance)
+ .addClusteringColumn("col1", AsciiType.instance)
+ .addRegularColumn("c1", AsciiType.instance)
+ .addRegularColumn("c2", AsciiType.instance)
+ .addRegularColumn("one", AsciiType.instance)
+ .addRegularColumn("two", AsciiType.instance);
+
+ TableMetadata.Builder builder2 =
+ TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
+ .addPartitionKeyColumn("k", ByteType.instance)
+ .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+
+ ks = Keyspace.open(KEYSPACE1);
+ cfs = ks.getColumnFamilyStore(CF_STANDARD);
+ cfm = cfs.metadata();
+ cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
+ cfm2 = cfs2.metadata();
+ m = cfm2.getColumn(new ColumnIdentifier("m", false));
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ dk = Util.dk("key1");
+ nowInSec = FBUtilities.nowInSeconds();
+ }
+
+ static void assertPartitionsEqual(RowIterator l, RowIterator r)
+ {
+ try (RowIterator left = l; RowIterator right = r)
+ {
+ Assert.assertTrue(Util.sameContent(left, right));
+ }
+ }
+
+ static void assertPartitionsEqual(UnfilteredRowIterator left, UnfilteredRowIterator right)
+ {
+ Assert.assertTrue(Util.sameContent(left, right));
+ }
+
+ static void assertPartitionsEqual(UnfilteredPartitionIterator left, UnfilteredPartitionIterator right)
+ {
+ while (left.hasNext())
+ {
+ Assert.assertTrue(right.hasNext());
+ assertPartitionsEqual(left.next(), right.next());
+ }
+ Assert.assertFalse(right.hasNext());
+ }
+
+ static void assertPartitionsEqual(PartitionIterator l, PartitionIterator r)
+ {
+ try (PartitionIterator left = l; PartitionIterator right = r)
+ {
+ while (left.hasNext())
+ {
+ Assert.assertTrue(right.hasNext());
+ assertPartitionsEqual(left.next(), right.next());
+ }
+ Assert.assertFalse(right.hasNext());
+ }
+ }
+
+ static void consume(PartitionIterator i)
+ {
+ try (PartitionIterator iterator = i)
+ {
+ while (iterator.hasNext())
+ {
+ try (RowIterator rows = iterator.next())
+ {
+ while (rows.hasNext())
+ rows.next();
+ }
+ }
+ }
+ }
+
+ static PartitionIterator filter(UnfilteredPartitionIterator iter)
+ {
+ return UnfilteredPartitionIterators.filter(iter, nowInSec);
+ }
+
+ static DecoratedKey dk(String k)
+ {
+ return cfs.decorateKey(ByteBufferUtil.bytes(k));
+ }
+
+ static DecoratedKey dk(int k)
+ {
+ return dk(Integer.toString(k));
+ }
+
+ static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data, boolean digest)
+ {
+ ReadResponse response = digest ? ReadResponse.createDigestResponse(data, command) : ReadResponse.createDataResponse(data, command);
+ return MessageIn.create(from, response, Collections.emptyMap(), MessagingService.Verb.READ, MessagingService.current_version);
+ }
+
+ static MessageIn<ReadResponse> response(ReadCommand command, InetAddressAndPort from, UnfilteredPartitionIterator data)
+ {
+ return response(command, from, data, false);
+ }
+
+ public RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
+ {
+ return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
+ }
+
+ public RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
+ {
+ ClusteringBound startBound = rtBound(start, true, inclusiveStart);
+ ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
+ return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
+ }
+
+ public ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
+ {
+ ClusteringBound.Kind kind = isStart
+ ? (inclusive ? ClusteringPrefix.Kind.INCL_START_BOUND : ClusteringPrefix.Kind.EXCL_START_BOUND)
+ : (inclusive ? ClusteringPrefix.Kind.INCL_END_BOUND : ClusteringPrefix.Kind.EXCL_END_BOUND);
+
+ return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ public ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
+ {
+ ClusteringBound.Kind kind = inclusiveOnEnd
+ ? ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY
+ : ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+ return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
+ }
+
+ public RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
+ {
+ return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
+ }
+
+ public RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
+ {
+ return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
+ new DeletionTime(markedForDeleteAt1, localDeletionTime1),
+ new DeletionTime(markedForDeleteAt2, localDeletionTime2));
+ }
+
+ public UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
+ {
+ return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
+ }
+
+ public UnfilteredPartitionIterator iter(PartitionUpdate update)
+ {
+ return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
+ }
+
+ public UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
+ {
+ SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
+ Collections.addAll(s, unfiltereds);
+ final Iterator<Unfiltered> iterator = s.iterator();
+
+ UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
+ key,
+ DeletionTime.LIVE,
+ cfm.regularAndStaticColumns(),
+ Rows.EMPTY_STATIC_ROW,
+ false,
+ EncodingStats.NO_STATS)
+ {
+ protected Unfiltered computeNext()
+ {
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+ };
+ return new SingletonUnfilteredPartitionIterator(rowIter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index ac8ed0b..abec25d 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -19,114 +19,101 @@ package org.apache.cassandra.service.reads;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.Iterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.MutableDeletionInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.service.reads.repair.TestableReadRepair;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.Util.assertClustering;
import static org.apache.cassandra.Util.assertColumn;
import static org.apache.cassandra.Util.assertColumns;
+import static org.apache.cassandra.db.ClusteringBound.Kind;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.apache.cassandra.db.ClusteringBound.Kind;
-public class DataResolverTest
+public class DataResolverTest extends AbstractReadResponseTest
{
public static final String KEYSPACE1 = "DataResolverTest";
public static final String CF_STANDARD = "Standard1";
- public static final String CF_COLLECTION = "Collection1";
-
- // counter to generate the last byte of the respondent's address in a ReadResponse message
- private int addressSuffix = 10;
-
- private DecoratedKey dk;
- private Keyspace ks;
- private ColumnFamilyStore cfs;
- private ColumnFamilyStore cfs2;
- private TableMetadata cfm;
- private TableMetadata cfm2;
- private ColumnMetadata m;
- private int nowInSec;
+
private ReadCommand command;
private TestableReadRepair readRepair;
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- DatabaseDescriptor.daemonInitialization();
-
- TableMetadata.Builder builder1 =
- TableMetadata.builder(KEYSPACE1, CF_STANDARD)
- .addPartitionKeyColumn("key", BytesType.instance)
- .addClusteringColumn("col1", AsciiType.instance)
- .addRegularColumn("c1", AsciiType.instance)
- .addRegularColumn("c2", AsciiType.instance)
- .addRegularColumn("one", AsciiType.instance)
- .addRegularColumn("two", AsciiType.instance);
-
- TableMetadata.Builder builder2 =
- TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
- .addPartitionKeyColumn("k", ByteType.instance)
- .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
-
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
- }
-
@Before
public void setup()
{
- dk = Util.dk("key1");
- ks = Keyspace.open(KEYSPACE1);
- cfs = ks.getColumnFamilyStore(CF_STANDARD);
- cfm = cfs.metadata();
- cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
- cfm2 = cfs2.metadata();
- m = cfm2.getColumn(new ColumnIdentifier("m", false));
-
- nowInSec = FBUtilities.nowInSeconds();
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
}
- @Test
- public void testResolveNewerSingleRow() throws UnknownHostException
+ private static EndpointsForRange makeReplicas(int num)
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c1", "v2")
- .buildUpdate())));
+ EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
+ for (int i = 0; i < num; i++)
+ {
+ try
+ {
+ replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) })));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ return replicas.build();
+ }
+
+ @Test
+ public void testResolveNewerSingleRow()
+ {
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate()), false));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c1", "v2")
+ .buildUpdate()), false));
try(PartitionIterator data = resolver.resolve())
{
@@ -149,16 +136,17 @@ public class DataResolverTest
@Test
public void testResolveDisjointSingleRow()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c2", "v2")
- .buildUpdate())));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c2", "v2")
+ .buildUpdate())));
try(PartitionIterator data = resolver.resolve())
{
@@ -185,15 +173,16 @@ public class DataResolverTest
@Test
public void testResolveDisjointMultipleRows() throws UnknownHostException
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("c1", "v1")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
- .add("c2", "v2")
- .buildUpdate())));
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("c1", "v1")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
+ .add("c2", "v2")
+ .buildUpdate())));
try (PartitionIterator data = resolver.resolve())
{
@@ -231,37 +220,38 @@ public class DataResolverTest
@Test
public void testResolveDisjointMultipleRowsWithRangeTombstones()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+ EndpointsForRange replicas = makeReplicas(4);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
- .addRangeTombstone(tombstone2)
- .buildUpdate();
+ .addRangeTombstone(tombstone2)
+ .buildUpdate();
- InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
- .addRangeTombstone(tombstone2)
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer1, iter1));
+ .addRangeTombstone(tombstone2)
+ .buildUpdate());
+ resolver.preprocess(response(command, peer1, iter1));
// not covered by any range tombstone
- InetAddressAndPort peer2 = peer();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
- .add("c1", "v0")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer2, iter2));
+ .add("c1", "v0")
+ .buildUpdate());
+ resolver.preprocess(response(command, peer2, iter2));
// covered by a range tombstone
- InetAddressAndPort peer3 = peer();
+ InetAddressAndPort peer3 = replicas.get(2).endpoint();
UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
- .add("c2", "v1")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer3, iter3));
+ .add("c2", "v1")
+ .buildUpdate());
+ resolver.preprocess(response(command, peer3, iter3));
// range covered by rt, but newer
- InetAddressAndPort peer4 = peer();
+ InetAddressAndPort peer4 = replicas.get(3).endpoint();
UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
- .add("one", "A")
- .buildUpdate());
- resolver.preprocess(readResponseMessage(peer4, iter4));
+ .add("one", "A")
+ .buildUpdate());
+ resolver.preprocess(response(command, peer4, iter4));
try (PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = data.next())
@@ -311,13 +301,14 @@ public class DataResolverTest
@Test
public void testResolveWithOneEmpty()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("c2", "v2")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm)));
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("c2", "v2")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, EmptyIterators.unfilteredPartition(cfm)));
try(PartitionIterator data = resolver.resolve())
{
@@ -340,10 +331,11 @@ public class DataResolverTest
@Test
public void testResolveWithBothEmpty()
{
+ EndpointsForRange replicas = makeReplicas(2);
TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
- resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm)));
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
+ resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
try(PartitionIterator data = resolver.resolve())
{
@@ -356,14 +348,15 @@ public class DataResolverTest
@Test
public void testResolveDeleted()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
// one response with columns timestamped before a delete in another response
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
- .add("one", "A")
- .buildUpdate())));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
+ .add("one", "A")
+ .buildUpdate())));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
try (PartitionIterator data = resolver.resolve())
{
@@ -381,23 +374,24 @@ public class DataResolverTest
@Test
public void testResolveMultipleDeleted()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime(), readRepair);
+ EndpointsForRange replicas = makeReplicas(4);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
// deletes and columns with interleaved timestamp, with out of order return sequence
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(command, peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
// these columns created after the previous deletion
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
- .add("one", "A")
- .add("two", "A")
- .buildUpdate())));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
+ .add("one", "A")
+ .add("two", "A")
+ .buildUpdate())));
//this column created after the next delete
- InetAddressAndPort peer3 = peer();
- resolver.preprocess(readResponseMessage(peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
- .add("two", "B")
- .buildUpdate())));
- InetAddressAndPort peer4 = peer();
- resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
+ InetAddressAndPort peer3 = replicas.get(2).endpoint();
+ resolver.preprocess(response(command, peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
+ .add("two", "B")
+ .buildUpdate())));
+ InetAddressAndPort peer4 = replicas.get(3).endpoint();
+ resolver.preprocess(response(command, peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
try(PartitionIterator data = resolver.resolve())
{
@@ -465,9 +459,10 @@ public class DataResolverTest
*/
private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream"
RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
@@ -485,8 +480,8 @@ public class DataResolverTest
.addRangeTombstone(four_five)
.buildUpdate());
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
+ resolver.preprocess(response(command, peer1, iter1));
+ resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
@@ -538,9 +533,10 @@ public class DataResolverTest
*/
private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream"
RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
@@ -554,8 +550,8 @@ public class DataResolverTest
RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
+ resolver.preprocess(response(command, peer1, iter1));
+ resolver.preprocess(response(command, peer2, iter2));
boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
@@ -589,9 +585,10 @@ public class DataResolverTest
@Test
public void testRepairRangeTombstoneWithPartitionDeletion()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream": just a partition deletion
UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
@@ -602,8 +599,8 @@ public class DataResolverTest
.addRangeTombstone(rt)
.buildUpdate());
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
+ resolver.preprocess(response(command, peer1, iter1));
+ resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
@@ -627,15 +624,16 @@ public class DataResolverTest
@Test
public void testRepairRangeTombstoneWithPartitionDeletion2()
{
- DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
- InetAddressAndPort peer1 = peer();
- InetAddressAndPort peer2 = peer();
+ EndpointsForRange replicas = makeReplicas(2);
+ DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream": a partition deletion and a range tombstone
RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
- .addRangeTombstone(rt1)
- .buildUpdate();
+ .addRangeTombstone(rt1)
+ .buildUpdate();
((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
UnfilteredPartitionIterator iter1 = iter(upd1);
@@ -647,8 +645,8 @@ public class DataResolverTest
.addRangeTombstone(rt3)
.buildUpdate());
- resolver.preprocess(readResponseMessage(peer1, iter1));
- resolver.preprocess(readResponseMessage(peer2, iter2));
+ resolver.preprocess(response(command, peer1, iter1));
+ resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
@@ -678,8 +676,8 @@ public class DataResolverTest
Slice slice = rt.deletedSlice();
ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues());
return condition
- ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
- : rt;
+ ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
+ : rt;
}
// Forces the end to be exclusive if the condition holds
@@ -691,8 +689,8 @@ public class DataResolverTest
Slice slice = rt.deletedSlice();
ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues());
return condition
- ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
- : rt;
+ ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
+ : rt;
}
private static ByteBuffer bb(int b)
@@ -708,9 +706,10 @@ public class DataResolverTest
@Test
public void testResolveComplexDelete()
{
+ EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -719,8 +718,8 @@ public class DataResolverTest
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
builder.addCell(mapCell(0, 0, ts[0]));
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
@@ -728,8 +727,8 @@ public class DataResolverTest
Cell expectedCell = mapCell(1, 1, ts[1]);
builder.addCell(expectedCell);
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
@@ -742,7 +741,6 @@ public class DataResolverTest
}
}
-
Mutation mutation = readRepair.getForEndpoint(peer1);
Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
assertTrue(rowIter.hasNext());
@@ -760,9 +758,10 @@ public class DataResolverTest
@Test
public void testResolveDeletedCollection()
{
+ EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -771,15 +770,15 @@ public class DataResolverTest
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
builder.addCell(mapCell(0, 0, ts[0]));
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
builder.addComplexDeletion(m, expectedCmplxDelete);
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
@@ -803,9 +802,10 @@ public class DataResolverTest
@Test
public void testResolveNewCollection()
{
+ EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -818,11 +818,11 @@ public class DataResolverTest
builder.addCell(expectedCell);
// empty map column
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
try(PartitionIterator data = resolver.resolve())
{
@@ -852,9 +852,10 @@ public class DataResolverTest
@Test
public void testResolveNewCollectionOverwritingDeleted()
{
+ EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
- DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime(), readRepair);
+ DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
@@ -863,8 +864,8 @@ public class DataResolverTest
builder.newRow(Clustering.EMPTY);
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
- InetAddressAndPort peer1 = peer();
- resolver.preprocess(readResponseMessage(peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer1 = replicas.get(0).endpoint();
+ resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
// newer, overwritten map column
builder.newRow(Clustering.EMPTY);
@@ -873,8 +874,8 @@ public class DataResolverTest
Cell expectedCell = mapCell(1, 1, ts[1]);
builder.addCell(expectedCell);
- InetAddressAndPort peer2 = peer();
- resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd));
+ InetAddressAndPort peer2 = replicas.get(1).endpoint();
+ resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
@@ -897,18 +898,6 @@ public class DataResolverTest
Assert.assertNull(readRepair.sent.get(peer2));
}
- private InetAddressAndPort peer()
- {
- try
- {
- return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
-
private void assertRepairContainsDeletions(Mutation mutation,
DeletionTime deletionTime,
RangeTombstone...rangeTombstones)
@@ -961,91 +950,8 @@ public class DataResolverTest
assertEquals(update.metadata().name, cfm.name);
}
-
- public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator)
- {
- return readResponseMessage(from, partitionIterator, command);
-
- }
- public MessageIn<ReadResponse> readResponseMessage(InetAddressAndPort from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
+ private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
{
- return MessageIn.create(from,
- ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
- Collections.EMPTY_MAP,
- MessagingService.Verb.REQUEST_RESPONSE,
- MessagingService.current_version);
- }
-
- private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)
- {
- return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime);
- }
-
- private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime)
- {
- ClusteringBound startBound = rtBound(start, true, inclusiveStart);
- ClusteringBound endBound = rtBound(end, false, inclusiveEnd);
- return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime));
- }
-
- private ClusteringBound rtBound(Object value, boolean isStart, boolean inclusive)
- {
- ClusteringBound.Kind kind = isStart
- ? (inclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
- : (inclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
-
- return ClusteringBound.create(kind, cfm.comparator.make(value).getRawValues());
- }
-
- private ClusteringBoundary rtBoundary(Object value, boolean inclusiveOnEnd)
- {
- ClusteringBound.Kind kind = inclusiveOnEnd
- ? Kind.INCL_END_EXCL_START_BOUNDARY
- : Kind.EXCL_END_INCL_START_BOUNDARY;
- return ClusteringBoundary.create(kind, cfm.comparator.make(value).getRawValues());
- }
-
- private RangeTombstoneBoundMarker marker(Object value, boolean isStart, boolean inclusive, long markedForDeleteAt, int localDeletionTime)
- {
- return new RangeTombstoneBoundMarker(rtBound(value, isStart, inclusive), new DeletionTime(markedForDeleteAt, localDeletionTime));
- }
-
- private RangeTombstoneBoundaryMarker boundary(Object value, boolean inclusiveOnEnd, long markedForDeleteAt1, int localDeletionTime1, long markedForDeleteAt2, int localDeletionTime2)
- {
- return new RangeTombstoneBoundaryMarker(rtBoundary(value, inclusiveOnEnd),
- new DeletionTime(markedForDeleteAt1, localDeletionTime1),
- new DeletionTime(markedForDeleteAt2, localDeletionTime2));
- }
-
- private UnfilteredPartitionIterator fullPartitionDelete(TableMetadata table, DecoratedKey dk, long timestamp, int nowInSec)
- {
- return new SingletonUnfilteredPartitionIterator(PartitionUpdate.fullPartitionDelete(table, dk, timestamp, nowInSec).unfilteredIterator());
- }
-
- private UnfilteredPartitionIterator iter(PartitionUpdate update)
- {
- return new SingletonUnfilteredPartitionIterator(update.unfilteredIterator());
- }
-
- private UnfilteredPartitionIterator iter(DecoratedKey key, Unfiltered... unfiltereds)
- {
- SortedSet<Unfiltered> s = new TreeSet<>(cfm.comparator);
- Collections.addAll(s, unfiltereds);
- final Iterator<Unfiltered> iterator = s.iterator();
-
- UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(cfm,
- key,
- DeletionTime.LIVE,
- cfm.regularAndStaticColumns(),
- Rows.EMPTY_STATIC_ROW,
- false,
- EncodingStats.NO_STATS)
- {
- protected Unfiltered computeNext()
- {
- return iterator.hasNext() ? iterator.next() : endOfData();
- }
- };
- return new SingletonUnfilteredPartitionIterator(rowIter);
+ return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org