You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by ifesdjeen <gi...@git.apache.org> on 2018/10/02 08:50:37 UTC
[GitHub] cassandra pull request #276: Repair job tests
GitHub user ifesdjeen opened a pull request:
https://github.com/apache/cassandra/pull/276
Repair job tests
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ifesdjeen/cassandra repair-job-tests
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/cassandra/pull/276.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #276
----
commit 31df761c82855522bf56f371da7b2f475fe413f2
Author: Alex Petrov <ol...@...>
Date: 2018-10-01T13:30:58Z
Add tests for repair job
commit 79c7574d68c22faec67673fa425b710c9cb594e9
Author: Alex Petrov <ol...@...>
Date: 2018-10-01T14:25:28Z
Enable dtests
commit 992baf72ad140aa10a9433e51457b7be5f383639
Author: Alex Petrov <ol...@...>
Date: 2018-10-01T14:31:45Z
Remove debug statement
commit dcbf317273b09cf8a3fc810351d046addb4379ff
Author: Alex Petrov <ol...@...>
Date: 2018-10-01T15:22:44Z
Fix tests
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r223008342
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
--- End diff --
right; added this assertion
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222603722
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
--- End diff --
maybe assert that all tasks are `AsymmetricRemoteSyncTask`s
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222952423
--- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
@@ -52,15 +52,9 @@
private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
private final UUID pendingRepair;
- private final boolean requestRanges;
- private final boolean transferRanges;
- public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
- boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
- {
- this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
- pendingRepair, requestRanges, transferRanges, previewKind);
- }
+ protected final boolean requestRanges;
--- End diff --
+1, made a change
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222608651
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
--- End diff --
this assert depends on the iteration order in https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java#L55 - when we have an option to stream between two hosts, we always pick the one with the fewest outgoing streams, so if by chance we have added a few outgoing streams to `addr2` before this, we might sync `range2` from `addr3` instead
we should probably assert that `addr1` gets `range2` from either `addr2` or `addr3` (but not both) instead
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222952614
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
--- End diff --
+1
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222609036
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
--- End diff --
as above - `addr3` could stream `range1` from either `addr1` or `addr2`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222953530
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
--- End diff --
+1, good point
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra issue #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on the issue:
https://github.com/apache/cassandra/pull/276
I've fixed your suggestions @krummas, should I commit?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222954234
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
--- End diff --
This invariant does not hold in the current code: `range2` _is_ streamed from both `addr1` and `addr2`, assert below checks for that. This doesn't look right, but I'd have to dig deeper to understand why.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222552355
--- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
@@ -52,15 +52,9 @@
private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
private final UUID pendingRepair;
- private final boolean requestRanges;
- private final boolean transferRanges;
- public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
- boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
- {
- this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
- pendingRepair, requestRanges, transferRanges, previewKind);
- }
+ protected final boolean requestRanges;
--- End diff --
these could be VisibleForTesting (and perhaps package private)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222600025
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
--- End diff --
this should probably be tested with `pullRepair = false` - otherwise the assert below passes with only `addr4` transient
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222952584
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
--- End diff --
Thank you for spotting
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222953487
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
--- End diff --
+1, addressed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen closed the pull request at:
https://github.com/apache/cassandra/pull/276
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222566547
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
--- End diff --
s/Stanard/Standard/ (in the whole file)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222610027
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync);
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ assertTrue(task.isLocal());
+ assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ assertTrue(((LocalSyncTask)task).requestRanges);
+ assertFalse(((LocalSyncTask)task).transferRanges);
+
+ task = tasks.get(pair(addr2, addr1));
+ assertFalse(task.isLocal());
+ assertElementEquals(Arrays.asList(range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
--- End diff --
and same as above
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222609899
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync);
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ assertTrue(task.isLocal());
+ assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ assertTrue(((LocalSyncTask)task).requestRanges);
+ assertFalse(((LocalSyncTask)task).transferRanges);
+
+ task = tasks.get(pair(addr2, addr1));
--- End diff --
as above, `addr2` could stream `range3` from either `addr1` or `addr3`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r223013279
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
+ pair.coordinator,
+ pair.peer),
+ (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
+ (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
+ task instanceof AsymmetricRemoteSyncTask);
+
+ // All ranges to be synchronised
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransient()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, false);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ }
+
+ @Test
+ public void testLocalSyncWithTransientPullRepair()
+ {
+ try
+ {
+ for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(local.address);
+ testLocalSyncWithTransient(local, true);
+ }
+ }
+ finally
+ {
+ FBUtilities.reset();
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+
+ }
+
+ public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient, // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ assertEquals(9, tasks.size());
+ for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
+ {
+ if (local.equals(addr))
+ continue;
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+ assertTrue(task.requestRanges);
+ assertEquals(!pullRepair, task.transferRanges);
+ }
+
+ LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+
+ task = (LocalSyncTask) tasks.get(pair(local, addr5));
+ assertTrue(task.requestRanges);
+ assertFalse(task.transferRanges);
+ }
+
+ @Test
+ public void testLocalAndRemoteTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ assertNull(tasks.get(pair(addr4, addr5)));
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr2, addr1),
+ pair(addr2, addr3),
+ pair(addr3, addr1),
+ pair(addr3, addr2) })
+ {
+ assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
+ }
+ }
+
+ @Test
+ public void testOptimizedCreateStandardSyncTasks()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"),
+ treeResponse(addr2, range1, "one", range2, "two"),
+ treeResponse(addr3, range1, "three", range2, "two"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+ treeResponses,
+ addr4, // local
+ noTransient(),
+ addr -> "DC1",
+ false,
+ PreviewKind.ALL));
+
+ assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
+ assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
--- End diff --
added a special assertion
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #276: Repair job tests
Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/276#discussion_r222596795
--- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+ private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
+
+ static InetAddressAndPort addr1;
+ static InetAddressAndPort addr2;
+ static InetAddressAndPort addr3;
+ static InetAddressAndPort addr4;
+ static InetAddressAndPort addr5;
+
+ static Range<Token> range1 = range(0, 1);
+ static Range<Token> range2 = range(2, 3);
+ static Range<Token> range3 = range(4, 5);
+ static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+ @AfterClass
+ public static void reset()
+ {
+ FBUtilities.reset();
+ }
+
+ static
+ {
+ try
+ {
+ addr1 = InetAddressAndPort.getByName("127.0.0.1");
+ addr2 = InetAddressAndPort.getByName("127.0.0.2");
+ addr3 = InetAddressAndPort.getByName("127.0.0.3");
+ addr4 = InetAddressAndPort.getByName("127.0.0.4");
+ addr5 = InetAddressAndPort.getByName("127.0.0.5");
+ DatabaseDescriptor.setBroadcastAddress(addr1.address);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testCreateStandardSyncTasks()
+ {
+ testCreateStandardSyncTasks(false);
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksPullRepair()
+ {
+ testCreateStandardSyncTasks(true);
+ }
+
+ public static void testCreateStandardSyncTasks(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
+ treeResponse(addr3, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ noTransient(), // transient
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(2, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+ Assert.assertNull(tasks.get(pair(addr1, addr3)));
+ }
+
+ @Test
+ public void testStanardSyncTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncTransient(true);
+ testStanardSyncTransient(false);
+ }
+
+ public void testStanardSyncTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr2),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+ Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testStanardSyncLocalTransient()
+ {
+ // Do not stream towards transient nodes
+ testStanardSyncLocalTransient(true);
+ testStanardSyncLocalTransient(false);
+ }
+
+ public void testStanardSyncLocalTransient(boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ transientPredicate(addr1),
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ if (pullRepair)
+ {
+ Assert.assertTrue(tasks.isEmpty());
+ return;
+ }
+
+ Assert.assertEquals(1, tasks.size());
+
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+ Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+ Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testEmptyDifference()
+ {
+ // one of the nodes is a local coordinator
+ testEmptyDifference(addr1, noTransient(), true);
+ testEmptyDifference(addr1, noTransient(), false);
+ testEmptyDifference(addr2, noTransient(), true);
+ testEmptyDifference(addr2, noTransient(), false);
+ testEmptyDifference(addr1, transientPredicate(addr1), true);
+ testEmptyDifference(addr2, transientPredicate(addr1), true);
+ testEmptyDifference(addr1, transientPredicate(addr1), false);
+ testEmptyDifference(addr2, transientPredicate(addr1), false);
+ testEmptyDifference(addr1, transientPredicate(addr2), true);
+ testEmptyDifference(addr2, transientPredicate(addr2), true);
+ testEmptyDifference(addr1, transientPredicate(addr2), false);
+ testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+ // nonlocal coordinator
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, noTransient(), true);
+ testEmptyDifference(addr3, noTransient(), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), true);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr1), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), true);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ testEmptyDifference(addr3, transientPredicate(addr2), false);
+ }
+
+ public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
+ treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ local, // local
+ isTransient,
+ false,
+ pullRepair,
+ PreviewKind.ALL));
+
+ Assert.assertTrue(tasks.isEmpty());
+ }
+
+ @Test
+ public void testCreateStandardSyncTasksAllDifferent()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
+
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ ep -> ep.equals(addr3), // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ Assert.assertEquals(3, tasks.size());
+ SyncTask task = tasks.get(pair(addr1, addr2));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr2, addr3));
+ Assert.assertFalse(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+
+ task = tasks.get(pair(addr1, addr3));
+ Assert.assertTrue(task.isLocal());
+ Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
+ }
+
+ @Test
+ public void testCreate5NodeStandardSyncTasksWithTransient()
+ {
+ List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
+ treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
+ treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
+ treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
+ treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
+
+ Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
+ Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
+ treeResponses,
+ addr1, // local
+ isTransient, // transient
+ false,
+ true,
+ PreviewKind.ALL));
+
+ SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+ pair(addr1, addr3),
+ pair(addr1, addr4),
+ pair(addr1, addr5),
+ pair(addr2, addr4),
+ pair(addr2, addr4),
+ pair(addr2, addr5),
+ pair(addr3, addr4),
+ pair(addr3, addr5)};
+
+ for (SyncNodePair pair : pairs)
+ {
+ SyncTask task = tasks.get(pair);
+ // Local only if addr1 is a coordinator
+ assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+ // Symmetric only if there are no transient participants
+ assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
--- End diff --
maybe make this something like:
```
boolean isRemote = !pair.coordinator.equals(addr1) && !pair.peer.equals(addr1);
boolean involvesTransient = isTransient.test(pair.coordinator) || isTransient.test(pair.peer);
assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",pair.coordinator, pair.peer),
isRemote && involvesTransient,
task instanceof AsymmetricRemoteSyncTask);
```
to make it a bit more readable?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org