You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:56 UTC
[04/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index 9c90d57..4afeb5a 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.locator;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -39,9 +38,11 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class OldNetworkTopologyStrategyTest
{
+
private List<Token> keyTokens;
private TokenMetadata tmd;
private Map<String, ArrayList<InetAddressAndPort>> expectedResults;
@@ -53,7 +54,7 @@ public class OldNetworkTopologyStrategyTest
}
@Before
- public void init()
+ public void init() throws Exception
{
keyTokens = new ArrayList<Token>();
tmd = new TokenMetadata();
@@ -160,11 +161,11 @@ public class OldNetworkTopologyStrategyTest
{
for (Token keyToken : keyTokens)
{
- List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyToken);
- for (int j = 0; j < endpoints.size(); j++)
+ int j = 0;
+ for (InetAddressAndPort endpoint : strategy.getNaturalReplicasForToken(keyToken).endpoints())
{
ArrayList<InetAddressAndPort> hostsExpected = expectedResults.get(keyToken.toString());
- assertEquals(endpoints.get(j), hostsExpected.get(j));
+ assertEquals(endpoint, hostsExpected.get(j++));
}
}
}
@@ -188,7 +189,6 @@ public class OldNetworkTopologyStrategyTest
assertEquals(ranges.left.iterator().next().left, tokensAfterMove[movingNodeIdx]);
assertEquals(ranges.left.iterator().next().right, tokens[movingNodeIdx]);
assertEquals("No data should be fetched", ranges.right.size(), 0);
-
}
@Test
@@ -205,7 +205,6 @@ public class OldNetworkTopologyStrategyTest
assertEquals("No data should be streamed", ranges.left.size(), 0);
assertEquals(ranges.right.iterator().next().left, tokens[movingNodeIdx]);
assertEquals(ranges.right.iterator().next().right, tokensAfterMove[movingNodeIdx]);
-
}
@SuppressWarnings("unchecked")
@@ -366,16 +365,21 @@ public class OldNetworkTopologyStrategyTest
TokenMetadata tokenMetadataAfterMove = initTokenMetadata(tokensAfterMove);
AbstractReplicationStrategy strategy = new OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, optsWithRF(2));
- Collection<Range<Token>> currentRanges = strategy.getAddressRanges().get(movingNode);
- Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
-
- Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = StorageService.instance.calculateStreamAndFetchRanges(currentRanges, updatedRanges);
+ RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode);
+ RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode);
- return ranges;
+ return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges));
}
private static Map<String, String> optsWithRF(int rf)
{
return Collections.singletonMap("replication_factor", Integer.toString(rf));
}
+
+ public static Pair<Set<Range<Token>>, Set<Range<Token>>> asRanges(Pair<RangesAtEndpoint, RangesAtEndpoint> replicas)
+ {
+ Set<Range<Token>> leftRanges = replicas.left.ranges();
+ Set<Range<Token>> rightRanges = replicas.right.ranges();
+ return Pair.create(leftRanges, rightRanges);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
index 56fd181..8e0bc00 100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.dht.Token;
import org.junit.Test;
import java.net.UnknownHostException;
-import java.util.Collection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -38,17 +37,29 @@ public class PendingRangeMapsTest {
return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
}
+ private static void addPendingRange(PendingRangeMaps pendingRangeMaps, Range<Token> range, String endpoint)
+ {
+ try
+ {
+ pendingRangeMaps.addPendingRange(range, Replica.fullReplica(InetAddressAndPort.getByName(endpoint), range));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
@Test
public void testPendingEndpoints() throws UnknownHostException
{
PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
- pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1"));
- pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2"));
- pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3"));
- pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4"));
- pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5"));
- pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6"));
+ addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+ addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+ addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+ addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+ addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+ addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
@@ -61,8 +72,8 @@ public class PendingRangeMapsTest {
assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
- Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
- assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
+ EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+ assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
}
@Test
@@ -70,13 +81,13 @@ public class PendingRangeMapsTest {
{
PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
- pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddressAndPort.getByName("127.0.0.1"));
- pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddressAndPort.getByName("127.0.0.2"));
- pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddressAndPort.getByName("127.0.0.3"));
- pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddressAndPort.getByName("127.0.0.4"));
- pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddressAndPort.getByName("127.0.0.5"));
- pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddressAndPort.getByName("127.0.0.6"));
- pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddressAndPort.getByName("127.0.0.7"));
+ addPendingRange(pendingRangeMaps, genRange("5", "15"), "127.0.0.1");
+ addPendingRange(pendingRangeMaps, genRange("15", "25"), "127.0.0.2");
+ addPendingRange(pendingRangeMaps, genRange("25", "35"), "127.0.0.3");
+ addPendingRange(pendingRangeMaps, genRange("35", "45"), "127.0.0.4");
+ addPendingRange(pendingRangeMaps, genRange("45", "55"), "127.0.0.5");
+ addPendingRange(pendingRangeMaps, genRange("45", "65"), "127.0.0.6");
+ addPendingRange(pendingRangeMaps, genRange("65", "7"), "127.0.0.7");
assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
@@ -90,8 +101,8 @@ public class PendingRangeMapsTest {
assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
- Collection<InetAddressAndPort> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
- assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.1")));
- assertTrue(endpoints.contains(InetAddressAndPort.getByName("127.0.0.7")));
+ EndpointsForToken replicas = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+ assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.1")));
+ assertTrue(replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.7")));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
new file mode 100644
index 0000000..66eff23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaCollectionTest
+{
+
+ static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP;
+ static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
+
+ static
+ {
+ try
+ {
+ EP1 = InetAddressAndPort.getByName("127.0.0.1");
+ EP2 = InetAddressAndPort.getByName("127.0.0.2");
+ EP3 = InetAddressAndPort.getByName("127.0.0.3");
+ EP4 = InetAddressAndPort.getByName("127.0.0.4");
+ EP5 = InetAddressAndPort.getByName("127.0.0.5");
+ BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
+ NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
+ R1 = range(0, 1);
+ R2 = range(1, 2);
+ R3 = range(2, 3);
+ R4 = range(3, 4);
+ R5 = range(4, 5);
+ BROADCAST_RANGE = range(10, 11);
+ NULL_RANGE = range(10000, 10001);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static Token tk(long t)
+ {
+ return new Murmur3Partitioner.LongToken(t);
+ }
+
+ static Range<Token> range(long left, long right)
+ {
+ return new Range<>(tk(left), tk(right));
+ }
+
+ static class TestCase<C extends AbstractReplicaCollection<C>>
+ {
+ final C test;
+ final List<Replica> canonicalList;
+ final Multimap<InetAddressAndPort, Replica> canonicalByEndpoint;
+ final Multimap<Range<Token>, Replica> canonicalByRange;
+
+ TestCase(C test, List<Replica> canonicalList)
+ {
+ this.test = test;
+ this.canonicalList = canonicalList;
+ this.canonicalByEndpoint = HashMultimap.create();
+ this.canonicalByRange = HashMultimap.create();
+ for (Replica replica : canonicalList)
+ canonicalByEndpoint.put(replica.endpoint(), replica);
+ for (Replica replica : canonicalList)
+ canonicalByRange.put(replica.range(), replica);
+ }
+
+ void testSize()
+ {
+ Assert.assertEquals(canonicalList.size(), test.size());
+ }
+
+ void testEquals()
+ {
+ Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+ }
+
+ void testEndpoints()
+ {
+ // TODO: we should do more exhaustive tests of the collection
+ Assert.assertEquals(ImmutableSet.copyOf(canonicalByEndpoint.keySet()), ImmutableSet.copyOf(test.endpoints()));
+ try
+ {
+ test.endpoints().add(EP5);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) {}
+ try
+ {
+ test.endpoints().remove(EP5);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) {}
+
+ Assert.assertTrue(test.endpoints().containsAll(canonicalByEndpoint.keySet()));
+ for (InetAddressAndPort ep : canonicalByEndpoint.keySet())
+ Assert.assertTrue(test.endpoints().contains(ep));
+ for (InetAddressAndPort ep : ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP))
+ if (!canonicalByEndpoint.containsKey(ep))
+ Assert.assertFalse(test.endpoints().contains(ep));
+ }
+
+ public void testOrderOfIteration()
+ {
+ Assert.assertEquals(canonicalList, ImmutableList.copyOf(test));
+ Assert.assertEquals(canonicalList, test.stream().collect(Collectors.toList()));
+ Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints());
+ }
+
+ void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ {
+ TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList());
+ allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1);
+
+ TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList());
+ noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
+
+ if (canonicalList.size() <= 2)
+ return;
+
+ List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0));
+ TestCase<C> newOrder = new TestCase<>(
+ test.select()
+ .add(r -> r == newOrderList.get(0), 3)
+ .add(r -> r == newOrderList.get(1), 3)
+ .add(r -> r == newOrderList.get(2), 3)
+ .get(), newOrderList
+ );
+ newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
+ }
+
+ private void assertSubList(C subCollection, int from, int to)
+ {
+ Assert.assertTrue(subCollection.isSnapshot);
+ if (from == to)
+ {
+ Assert.assertTrue(subCollection.isEmpty());
+ }
+ else
+ {
+ List<Replica> subList = this.test.list.subList(from, to);
+ if (test.isSnapshot)
+ Assert.assertSame(subList.getClass(), subCollection.list.getClass());
+ Assert.assertEquals(subList, subCollection.list);
+ }
+ }
+
+ void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ {
+ if (test.isSnapshot)
+ Assert.assertSame(test, test.subList(0, test.size()));
+
+ if (test.isEmpty())
+ return;
+
+ TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size()));
+ assertSubList(skipFront.test, 1, canonicalList.size());
+ skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1));
+ assertSubList(skipBack.test, 0, canonicalList.size() - 1);
+ skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+ }
+
+ void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ {
+ if (test.isSnapshot)
+ Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
+
+ if (test.isEmpty())
+ return;
+ // remove start
+ // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
+ assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size());
+
+ if (test.size() <= 1)
+ return;
+ // remove end
+ // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
+ assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+
+ if (test.size() <= 2)
+ return;
+ Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
+ TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
+ filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth);
+ }
+
+ void testContains()
+ {
+ for (Replica replica : canonicalList)
+ Assert.assertTrue(test.contains(replica));
+ Assert.assertFalse(test.contains(fullReplica(NULL_EP, NULL_RANGE)));
+ }
+
+ void testGet()
+ {
+ for (int i = 0 ; i < canonicalList.size() ; ++i)
+ Assert.assertEquals(canonicalList.get(i), test.get(i));
+ }
+
+ void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ {
+ final Comparator<Replica> comparator = (o1, o2) ->
+ {
+ boolean f1 = o1 == canonicalList.get(0);
+ boolean f2 = o2 == canonicalList.get(0);
+ return f1 == f2 ? 0 : f1 ? 1 : -1;
+ };
+ TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
+ sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth);
+ }
+
+ private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+ {
+ testEndpoints();
+ testOrderOfIteration();
+ testContains();
+ testGet();
+ testEquals();
+ testSize();
+ if (subListDepth > 0)
+ testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+ if (filterDepth > 0)
+ testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+ if (sortDepth > 0)
+ testSort(subListDepth, filterDepth, sortDepth, selectDepth);
+ if (selectDepth > 0)
+ testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+ }
+
+ public void testAll()
+ {
+ testAll(2, 2, 2, 2);
+ }
+ }
+
+ static class RangesAtEndpointTestCase extends TestCase<RangesAtEndpoint>
+ {
+ RangesAtEndpointTestCase(RangesAtEndpoint test, List<Replica> canonicalList)
+ {
+ super(test, canonicalList);
+ }
+
+ void testRanges()
+ {
+ Assert.assertEquals(ImmutableSet.copyOf(canonicalByRange.keySet()), ImmutableSet.copyOf(test.ranges()));
+ try
+ {
+ test.ranges().add(R5);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) {}
+ try
+ {
+ test.ranges().remove(R5);
+ Assert.fail();
+ } catch (UnsupportedOperationException e) {}
+
+ Assert.assertTrue(test.ranges().containsAll(canonicalByRange.keySet()));
+ for (Range<Token> range : canonicalByRange.keySet())
+ Assert.assertTrue(test.ranges().contains(range));
+ for (Range<Token> range : ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE))
+ if (!canonicalByRange.containsKey(range))
+ Assert.assertFalse(test.ranges().contains(range));
+ }
+
+ @Override
+ public void testOrderOfIteration()
+ {
+ super.testOrderOfIteration();
+ Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::range)), test.ranges());
+ }
+
+ @Override
+ public void testAll()
+ {
+ super.testAll();
+ testRanges();
+ }
+ }
+
+ private static final ImmutableList<Replica> RANGES_AT_ENDPOINT = ImmutableList.of(
+ fullReplica(EP1, R1),
+ fullReplica(EP1, R2),
+ transientReplica(EP1, R3),
+ fullReplica(EP1, R4),
+ transientReplica(EP1, R5)
+ );
+
+ @Test
+ public void testRangesAtEndpoint()
+ {
+ ImmutableList<Replica> canonical = RANGES_AT_ENDPOINT;
+ new RangesAtEndpointTestCase(
+ RangesAtEndpoint.copyOf(canonical), canonical
+ ).testAll();
+ }
+
+ @Test
+ public void testMutableRangesAtEndpoint()
+ {
+ ImmutableList<Replica> canonical1 = RANGES_AT_ENDPOINT.subList(0, RANGES_AT_ENDPOINT.size());
+ RangesAtEndpoint.Mutable test = new RangesAtEndpoint.Mutable(RANGES_AT_ENDPOINT.get(0).endpoint(), canonical1.size());
+ test.addAll(canonical1, Conflict.NONE);
+ try
+ { // incorrect range
+ test.addAll(canonical1, Conflict.NONE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+ try
+ { // invalid endpoint; always error
+ test.add(fullReplica(EP2, BROADCAST_RANGE), Conflict.ALL);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ try
+ { // conflict on isFull/isTransient
+ test.add(fullReplica(EP1, R3), Conflict.DUPLICATE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.add(fullReplica(EP1, R3), Conflict.ALL);
+
+ new RangesAtEndpointTestCase(test, canonical1).testAll();
+
+ RangesAtEndpoint view = test.asImmutableView();
+ RangesAtEndpoint snapshot = view.subList(0, view.size());
+
+ ImmutableList<Replica> canonical2 = RANGES_AT_ENDPOINT;
+ test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+ new TestCase<>(snapshot, canonical1).testAll();
+ new TestCase<>(view, canonical2).testAll();
+ new TestCase<>(test, canonical2).testAll();
+ }
+
+ private static final ImmutableList<Replica> ENDPOINTS_FOR_X = ImmutableList.of(
+ fullReplica(EP1, R1),
+ fullReplica(EP2, R1),
+ transientReplica(EP3, R1),
+ fullReplica(EP4, R1),
+ transientReplica(EP5, R1)
+ );
+
+ @Test
+ public void testEndpointsForRange()
+ {
+ ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+ new TestCase<>(
+ EndpointsForRange.copyOf(canonical), canonical
+ ).testAll();
+ }
+
+ @Test
+ public void testMutableEndpointsForRange()
+ {
+ ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1);
+ EndpointsForRange.Mutable test = new EndpointsForRange.Mutable(R1, canonical1.size());
+ test.addAll(canonical1, Conflict.NONE);
+ try
+ { // incorrect range
+ test.addAll(canonical1, Conflict.NONE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+ try
+ { // incorrect range
+ test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ try
+ { // conflict on isFull/isTransient
+ test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+ new TestCase<>(test, canonical1).testAll();
+
+ EndpointsForRange view = test.asImmutableView();
+ EndpointsForRange snapshot = view.subList(0, view.size());
+
+ ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+ test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+ new TestCase<>(snapshot, canonical1).testAll();
+ new TestCase<>(view, canonical2).testAll();
+ new TestCase<>(test, canonical2).testAll();
+ }
+
+ @Test
+ public void testEndpointsForToken()
+ {
+ ImmutableList<Replica> canonical = ENDPOINTS_FOR_X;
+ new TestCase<>(
+ EndpointsForToken.copyOf(tk(1), canonical), canonical
+ ).testAll();
+ }
+
+ @Test
+ public void testMutableEndpointsForToken()
+ {
+ ImmutableList<Replica> canonical1 = ENDPOINTS_FOR_X.subList(0, ENDPOINTS_FOR_X.size() - 1);
+ EndpointsForToken.Mutable test = new EndpointsForToken.Mutable(tk(1), canonical1.size());
+ test.addAll(canonical1, Conflict.NONE);
+ try
+ { // incorrect range
+ test.addAll(canonical1, Conflict.NONE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.addAll(canonical1, Conflict.DUPLICATE); // we ignore exact duplicates
+ try
+ { // incorrect range
+ test.add(fullReplica(BROADCAST_EP, R2), Conflict.ALL);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ try
+ { // conflict on isFull/isTransient
+ test.add(transientReplica(EP1, R1), Conflict.DUPLICATE);
+ Assert.fail();
+ } catch (IllegalArgumentException e) { }
+ test.add(transientReplica(EP1, R1), Conflict.ALL);
+
+ new TestCase<>(test, canonical1).testAll();
+
+ EndpointsForToken view = test.asImmutableView();
+ EndpointsForToken snapshot = view.subList(0, view.size());
+
+ ImmutableList<Replica> canonical2 = ENDPOINTS_FOR_X;
+ test.addAll(canonical2.reverse(), Conflict.DUPLICATE);
+ new TestCase<>(snapshot, canonical1).testAll();
+ new TestCase<>(view, canonical2).testAll();
+ new TestCase<>(test, canonical2).testAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
new file mode 100644
index 0000000..66f538f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
+public class ReplicaUtils
+{
+ public static final Range<Token> FULL_RANGE = new Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM);
+ public static final AbstractBounds<PartitionPosition> FULL_BOUNDS = new Range<>(Murmur3Partitioner.MINIMUM.minKeyBound(), Murmur3Partitioner.MINIMUM.maxKeyBound());
+
+ public static Replica full(InetAddressAndPort endpoint)
+ {
+ return fullReplica(endpoint, FULL_RANGE);
+ }
+
+ public static Replica trans(InetAddressAndPort endpoint)
+ {
+ return transientReplica(endpoint, FULL_RANGE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
new file mode 100644
index 0000000..a0427db
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+
+public class ReplicationFactorTest
+{
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+ Gossiper.instance.start(1);
+ }
+
+ private static void assertRfParseFailure(String s)
+ {
+ try
+ {
+ ReplicationFactor.fromString(s);
+ Assert.fail("Expected IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+ }
+
+ private static void assertRfParse(String s, int expectedReplicas, int expectedTrans)
+ {
+ ReplicationFactor rf = ReplicationFactor.fromString(s);
+ Assert.assertEquals(expectedReplicas, rf.allReplicas);
+ Assert.assertEquals(expectedTrans, rf.transientReplicas());
+ Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
+ }
+
+ @Test
+ public void parseTest()
+ {
+ assertRfParse("3", 3, 0);
+ assertRfParse("3/1", 3, 1);
+
+ assertRfParse("5", 5, 0);
+ assertRfParse("5/2", 5, 2);
+
+ assertRfParseFailure("-1");
+ assertRfParseFailure("3/3");
+ assertRfParseFailure("3/4");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index a8caa72..e6a9365 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,9 +17,7 @@
*/
package org.apache.cassandra.locator;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -75,7 +73,7 @@ public class ReplicationStrategyEndpointCacheTest
public void runEndpointsWereCachedTest(Class stratClass, Map<String, String> configOptions) throws Exception
{
setup(stratClass, configOptions);
- assert strategy.getNaturalEndpoints(searchToken).equals(strategy.getNaturalEndpoints(searchToken));
+ assert strategy.getNaturalReplicasForToken(searchToken).equals(strategy.getNaturalReplicasForToken(searchToken));
}
@Test
@@ -89,34 +87,34 @@ public class ReplicationStrategyEndpointCacheTest
public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
{
setup(stratClass, configOptions);
- ArrayList<InetAddressAndPort> initial;
- ArrayList<InetAddressAndPort> endpoints;
+ EndpointsForToken initial;
+ EndpointsForToken replicas;
- endpoints = strategy.getNaturalEndpoints(searchToken);
- assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
+ replicas = strategy.getNaturalReplicasForToken(searchToken);
+ assert replicas.size() == 5 : StringUtils.join(replicas, ",");
// test token addition, in DC2 before existing token
- initial = strategy.getNaturalEndpoints(searchToken);
+ initial = strategy.getNaturalReplicasForToken(searchToken);
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5"));
- endpoints = strategy.getNaturalEndpoints(searchToken);
- assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
- assert !endpoints.equals(initial);
+ replicas = strategy.getNaturalReplicasForToken(searchToken);
+ assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+ assert !replicas.equals(initial);
// test token removal, newly created token
- initial = strategy.getNaturalEndpoints(searchToken);
+ initial = strategy.getNaturalReplicasForToken(searchToken);
tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
- endpoints = strategy.getNaturalEndpoints(searchToken);
- assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
- assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5"));
- assert !endpoints.equals(initial);
+ replicas = strategy.getNaturalReplicasForToken(searchToken);
+ assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+ assert !replicas.endpoints().contains(InetAddressAndPort.getByName("127.0.0.5"));
+ assert !replicas.equals(initial);
// test token change
- initial = strategy.getNaturalEndpoints(searchToken);
+ initial = strategy.getNaturalReplicasForToken(searchToken);
//move .8 after search token but before other DC3
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8"));
- endpoints = strategy.getNaturalEndpoints(searchToken);
- assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
- assert !endpoints.equals(initial);
+ replicas = strategy.getNaturalReplicasForToken(searchToken);
+ assert replicas.size() == 5 : StringUtils.join(replicas, ",");
+ assert !replicas.equals(initial);
}
protected static class FakeSimpleStrategy extends SimpleStrategy
@@ -128,11 +126,11 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ assert !called : "calculateNaturalReplicas was already called, result should have been cached";
called = true;
- return super.calculateNaturalEndpoints(token, metadata);
+ return super.calculateNaturalReplicas(token, metadata);
}
}
@@ -145,11 +143,11 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ assert !called : "calculateNaturalReplicas was already called, result should have been cached";
called = true;
- return super.calculateNaturalEndpoints(token, metadata);
+ return super.calculateNaturalReplicas(token, metadata);
}
}
@@ -162,11 +160,11 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
+ assert !called : "calculateNaturalReplicas was already called, result should have been cached";
called = true;
- return super.calculateNaturalEndpoints(token, metadata);
+ return super.calculateNaturalReplicas(token, metadata);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index fe77b0e..1e0c152 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -19,14 +19,22 @@ package org.apache.cassandra.locator;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
@@ -53,6 +61,7 @@ public class SimpleStrategyTest
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
}
@Test
@@ -107,12 +116,12 @@ public class SimpleStrategyTest
for (int i = 0; i < keyTokens.length; i++)
{
- List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
- assertEquals(strategy.getReplicationFactor(), endpoints.size());
+ EndpointsForToken replicas = strategy.getNaturalReplicasForToken(keyTokens[i]);
+ assertEquals(strategy.getReplicationFactor().allReplicas, replicas.size());
List<InetAddressAndPort> correctEndpoints = new ArrayList<>();
- for (int j = 0; j < endpoints.size(); j++)
+ for (int j = 0; j < replicas.size(); j++)
correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
- assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints));
+ assertEquals(new HashSet<>(correctEndpoints), replicas.endpoints());
}
}
}
@@ -154,30 +163,80 @@ public class SimpleStrategyTest
PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName);
- int replicationFactor = strategy.getReplicationFactor();
+ int replicationFactor = strategy.getReplicationFactor().allReplicas;
for (int i = 0; i < keyTokens.length; i++)
{
- Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
- assertTrue(endpoints.size() >= replicationFactor);
+ EndpointsForToken replicas = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalReplicasForToken(keyTokens[i]));
+ assertTrue(replicas.size() >= replicationFactor);
for (int j = 0; j < replicationFactor; j++)
{
//Check that the old nodes are definitely included
- assertTrue(endpoints.contains(hosts.get((i + j + 1) % hosts.size())));
+ assertTrue(replicas.endpoints().contains(hosts.get((i + j + 1) % hosts.size())));
}
// bootstrapEndpoint should be in the endpoints for i in MAX-RF to MAX, but not in any earlier ep.
if (i < RING_SIZE - replicationFactor)
- assertFalse(endpoints.contains(bootstrapEndpoint));
+ assertFalse(replicas.endpoints().contains(bootstrapEndpoint));
else
- assertTrue(endpoints.contains(bootstrapEndpoint));
+ assertTrue(replicas.endpoints().contains(bootstrapEndpoint));
}
}
StorageServiceAccessor.setTokenMetadata(oldTmd);
}
+ private static Token tk(long t)
+ {
+ return new Murmur3Partitioner.LongToken(t);
+ }
+
+ private static Range<Token> range(long l, long r)
+ {
+ return new Range<>(tk(l), tk(r));
+ }
+
+ @Test
+ public void transientReplica() throws Exception
+ {
+ IEndpointSnitch snitch = new SimpleSnitch();
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+
+ List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.2"),
+ InetAddressAndPort.getByName("127.0.0.3"),
+ InetAddressAndPort.getByName("127.0.0.4"));
+
+ Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create();
+ tokens.put(endpoints.get(0), tk(100));
+ tokens.put(endpoints.get(1), tk(200));
+ tokens.put(endpoints.get(2), tk(300));
+ tokens.put(endpoints.get(3), tk(400));
+ TokenMetadata metadata = new TokenMetadata();
+ metadata.updateNormalTokens(tokens);
+
+ Map<String, String> configOptions = new HashMap<String, String>();
+ configOptions.put("replication_factor", "3/1");
+
+ SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, configOptions);
+
+ Range<Token> range1 = range(400, 100);
+ Assert.assertEquals(EndpointsForToken.of(range1.right,
+ Replica.fullReplica(endpoints.get(0), range1),
+ Replica.fullReplica(endpoints.get(1), range1),
+ Replica.transientReplica(endpoints.get(2), range1)),
+ strategy.getNaturalReplicasForToken(tk(99)));
+
+
+ Range<Token> range2 = range(100, 200);
+ Assert.assertEquals(EndpointsForToken.of(range2.right,
+ Replica.fullReplica(endpoints.get(1), range2),
+ Replica.fullReplica(endpoints.get(2), range2),
+ Replica.transientReplica(endpoints.get(3), range2)),
+ strategy.getNaturalReplicasForToken(tk(101)));
+ }
+
private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd)
{
KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index b589d2d..ae8c011 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -118,7 +118,7 @@ public class TokenMetadataTest
}
@Override
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
@@ -165,7 +165,7 @@ public class TokenMetadataTest
}
@Override
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
@@ -216,7 +216,7 @@ public class TokenMetadataTest
}
@Override
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
@@ -263,7 +263,7 @@ public class TokenMetadataTest
}
@Override
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index d97cdb8..e226d32 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -25,19 +25,20 @@ import org.junit.Test;
import org.junit.Assert;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.MessagingService.Verb;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+
public class WriteCallbackInfoTest
{
@BeforeClass
@@ -65,7 +66,7 @@ public class WriteCallbackInfoTest
? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
: new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
- WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+ WriteCallbackInfo wcbi = new WriteCallbackInfo(full(InetAddressAndPort.getByName("192.168.1.1")), null, new MessageOut(verb, payload, null), null, cl, allowHints);
Assert.assertEquals(expectHint, wcbi.shouldHint());
if (expectHint)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 6a8dc83..379031c 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceTest;
@@ -172,7 +173,7 @@ public class OutboundMessagingConnectionTest
return nodeToDc.get(endpoint);
}
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
deleted file mode 100644
index 802a673..0000000
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LocalSyncTaskTest extends AbstractRepairTest
-{
- private static final IPartitioner partitioner = Murmur3Partitioner.instance;
- public static final String KEYSPACE1 = "DifferencerTest";
- public static final String CF_STANDARD = "Standard1";
- public static ColumnFamilyStore cfs;
-
- @BeforeClass
- public static void defineSchema()
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
-
- TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
- cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
- }
-
- /**
- * When there is no difference between two, LocalSyncTask should return stats with 0 difference.
- */
- @Test
- public void testNoDifference() throws Throwable
- {
- final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
- final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
-
- Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
- MerkleTrees tree1 = createInitialTree(desc);
-
- MerkleTrees tree2 = createInitialTree(desc);
-
- // difference the trees
- // note: we reuse the same endpoint which is bogus in theory but fine here
- TreeResponse r1 = new TreeResponse(ep1, tree1);
- TreeResponse r2 = new TreeResponse(ep2, tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
- task.run();
-
- assertEquals(0, task.get().numberOfDifferences);
- }
-
- @Test
- public void testDifference() throws Throwable
- {
- Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
- UUID parentRepairSession = UUID.randomUUID();
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
- ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
- Arrays.asList(cfs), Arrays.asList(range), false,
- ActiveRepairService.UNREPAIRED_SSTABLE, false,
- PreviewKind.NONE);
-
- RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
- MerkleTrees tree1 = createInitialTree(desc);
-
- MerkleTrees tree2 = createInitialTree(desc);
-
- // change a range in one of the trees
- Token token = partitioner.midpoint(range.left, range.right);
- tree1.invalidate(token);
- MerkleTree.TreeRange changed = tree1.get(token);
- changed.hash("non-empty hash!".getBytes());
-
- Set<Range<Token>> interesting = new HashSet<>();
- interesting.add(changed);
-
- // difference the trees
- // note: we reuse the same endpoint which is bogus in theory but fine here
- TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
- TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
- task.run();
-
- // ensure that the changed range was recorded
- assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
- }
-
- @Test
- public void fullRepairStreamPlan() throws Exception
- {
- UUID sessionID = registerSession(cfs, true, true);
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
- RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
- TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
- TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
- StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
- assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
- assertTrue(plan.getFlushBeforeTransfer());
- }
-
- @Test
- public void incrementalRepairStreamPlan() throws Exception
- {
- UUID sessionID = registerSession(cfs, true, true);
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
- RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
- TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
- TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE);
- StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
- assertEquals(desc.parentSessionId, plan.getPendingRepair());
- assertFalse(plan.getFlushBeforeTransfer());
- }
-
- private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
- {
- MerkleTrees tree = new MerkleTrees(partitioner);
- tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
- tree.init();
- for (MerkleTree.TreeRange r : tree.invalids())
- {
- r.ensureHashInitialised();
- }
- return tree;
- }
-
- private MerkleTrees createInitialTree(RepairJobDesc desc)
- {
- return createInitialTree(desc, partitioner);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
index 2044106..418d7de 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -29,7 +28,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.RepairRunnable.CommonRange;
import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
@@ -41,7 +39,7 @@ public class RepairRunnableTest extends AbstractRepairTest
@Test
public void filterCommonIncrementalRangesNotForced() throws Exception
{
- CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES);
+ CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES);
List<CommonRange> expected = Lists.newArrayList(cr);
List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false);
@@ -52,13 +50,13 @@ public class RepairRunnableTest extends AbstractRepairTest
@Test
public void forceFilterCommonIncrementalRanges() throws Exception
{
- CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
- CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
+ CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2));
+ CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3));
Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
- List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
- new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)));
+ List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)),
+ new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)));
List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true);
Assert.assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 54f0511..e77d657 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -62,9 +63,10 @@ public class RepairSessionTest
IPartitioner p = Murmur3Partitioner.instance;
Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
- RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
+ RepairSession session = new RepairSession(parentSessionId, sessionId,
+ new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)),
"Keyspace1", RepairParallelism.SEQUENTIAL,
- endpoints, false, false, false,
+ false, false, false,
PreviewKind.NONE, false, "Standard1");
// perform convict
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
new file mode 100644
index 0000000..92ae172
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SymmetricLocalSyncTaskTest extends AbstractRepairTest
+{
+ private static final IPartitioner partitioner = Murmur3Partitioner.instance;
+ public static final String KEYSPACE1 = "DifferencerTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
+
+ TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
+ cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+ }
+
+ /**
+ * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference.
+ */
+ @Test
+ public void testNoDifference() throws Throwable
+ {
+ final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
+ final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
+
+ Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+ MerkleTrees tree1 = createInitialTree(desc);
+
+ MerkleTrees tree2 = createInitialTree(desc);
+
+ // difference the trees
+ // note: we reuse the same endpoint which is bogus in theory but fine here
+ TreeResponse r1 = new TreeResponse(ep1, tree1);
+ TreeResponse r2 = new TreeResponse(ep2, tree2);
+ SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+ task.run();
+
+ assertEquals(0, task.get().numberOfDifferences);
+ }
+
+ @Test
+ public void testDifference() throws Throwable
+ {
+ Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+ UUID parentRepairSession = UUID.randomUUID();
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
+ Arrays.asList(cfs), Arrays.asList(range), false,
+ ActiveRepairService.UNREPAIRED_SSTABLE, false,
+ PreviewKind.NONE);
+
+ RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+ MerkleTrees tree1 = createInitialTree(desc);
+
+ MerkleTrees tree2 = createInitialTree(desc);
+
+ // change a range in one of the trees
+ Token token = partitioner.midpoint(range.left, range.right);
+ tree1.invalidate(token);
+ MerkleTree.TreeRange changed = tree1.get(token);
+ changed.hash("non-empty hash!".getBytes());
+
+ Set<Range<Token>> interesting = new HashSet<>();
+ interesting.add(changed);
+
+ // difference the trees
+ // note: we reuse the same endpoint which is bogus in theory but fine here
+ TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
+ TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
+ SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+ DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
+ DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
+ try
+ {
+ task.run();
+ }
+ finally
+ {
+ DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
+ DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3;
+ }
+
+ // ensure that the changed range was recorded
+ assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
+ }
+
+ @Test
+ public void fullRepairStreamPlan() throws Exception
+ {
+ UUID sessionID = registerSession(cfs, true, true);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+ RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+ TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+ TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+ SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
+ StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
+
+ assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
+ assertTrue(plan.getFlushBeforeTransfer());
+ }
+
+ private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing)
+ {
+ StreamCoordinator coordinator = plan.getCoordinator();
+ StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions());
+ assertEquals(expectedIncoming, session.getNumRequests());
+ assertEquals(expectedOutgoing, session.getNumTransfers());
+ }
+
+ @Test
+ public void incrementalRepairStreamPlan() throws Exception
+ {
+ UUID sessionID = registerSession(cfs, true, true);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+ RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+ TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+ TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+ SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE);
+ StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
+
+ assertEquals(desc.parentSessionId, plan.getPendingRepair());
+ assertFalse(plan.getFlushBeforeTransfer());
+ assertNumInOut(plan, 1, 1);
+ }
+
+ /**
+ * Don't reciprocate streams if the other endpoint is a transient replica
+ */
+ @Test
+ public void transientStreamPlan()
+ {
+ UUID sessionID = registerSession(cfs, true, true);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+ RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+ TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+ TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+ SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE);
+ StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1));
+ assertNumInOut(plan, 1, 0);
+ }
+
+ private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
+ {
+ MerkleTrees tree = new MerkleTrees(partitioner);
+ tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
+ tree.init();
+ for (MerkleTree.TreeRange r : tree.invalids())
+ {
+ r.ensureHashInitialised();
+ }
+ return tree;
+ }
+
+ private MerkleTrees createInitialTree(RepairJobDesc desc)
+ {
+ return createInitialTree(desc, partitioner);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
new file mode 100644
index 0000000..06f968f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
+{
+ private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES);
+ private static final List<Range<Token>> RANGE_LIST = ImmutableList.of(RANGE1);
+
+ private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask
+ {
+ public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2)
+ {
+ super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE);
+ }
+
+ RepairMessage sentMessage = null;
+ InetAddressAndPort sentTo = null;
+
+ @Override
+ void sendRequest(RepairMessage request, InetAddressAndPort to)
+ {
+ Assert.assertNull(sentMessage);
+ Assert.assertNotNull(request);
+ Assert.assertNotNull(to);
+ sentMessage = request;
+ sentTo = to;
+ }
+ }
+
+ @Test
+ public void normalSync()
+ {
+ InstrumentedSymmetricRemoteSyncTask syncTask = new InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2);
+ syncTask.startSync(RANGE_LIST);
+
+ Assert.assertNotNull(syncTask.sentMessage);
+ Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass());
+ Assert.assertEquals(PARTICIPANT1, syncTask.sentTo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index 3ea888d..a7e8272 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -44,12 +44,13 @@ public class LocalSessionAccessor
ARS.consistent.local.putSessionUnsafe(session);
}
- public static void finalizeUnsafe(UUID sessionID)
+ public static long finalizeUnsafe(UUID sessionID)
{
LocalSession session = ARS.consistent.local.getSession(sessionID);
assert session != null;
session.setState(ConsistentSession.State.FINALIZED);
ARS.consistent.local.save(session);
+ return session.repairedAt;
}
public static void failUnsafe(UUID sessionID)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index d368510..e387c41 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -40,10 +40,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.KeyspaceRepairManager;
@@ -136,7 +135,11 @@ public class LocalSessionTest extends AbstractRepairTest
boolean prepareSessionCalled = false;
@Override
- ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor)
+ ListenableFuture prepareSession(KeyspaceRepairManager repairManager,
+ UUID sessionID,
+ Collection<ColumnFamilyStore> tables,
+ RangesAtEndpoint ranges,
+ ExecutorService executor)
{
prepareSessionCalled = true;
if (prepareSessionFuture != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/schema/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 98bf9ca..7a6b011 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -127,7 +127,7 @@ public class MockSchema
}
SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
- .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header)
+ .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org