You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by ifesdjeen <gi...@git.apache.org> on 2018/10/02 08:50:37 UTC

[GitHub] cassandra pull request #276: Repair job tests

GitHub user ifesdjeen opened a pull request:

    https://github.com/apache/cassandra/pull/276

    Repair job tests

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ifesdjeen/cassandra repair-job-tests

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #276
    
----
commit 31df761c82855522bf56f371da7b2f475fe413f2
Author: Alex Petrov <ol...@...>
Date:   2018-10-01T13:30:58Z

    Add tests for repair job

commit 79c7574d68c22faec67673fa425b710c9cb594e9
Author: Alex Petrov <ol...@...>
Date:   2018-10-01T14:25:28Z

    Enable dtests

commit 992baf72ad140aa10a9433e51457b7be5f383639
Author: Alex Petrov <ol...@...>
Date:   2018-10-01T14:31:45Z

    Remove debug statement

commit dcbf317273b09cf8a3fc810351d046addb4379ff
Author: Alex Petrov <ol...@...>
Date:   2018-10-01T15:22:44Z

    Fix tests

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r223008342
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
    --- End diff --
    
    right; added this assertion


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222603722
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    --- End diff --
    
    maybe assert that all tasks are `AsymmetricRemoteSyncTask`s


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222952423
  
    --- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
    @@ -52,15 +52,9 @@
         private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
     
         private final UUID pendingRepair;
    -    private final boolean requestRanges;
    -    private final boolean transferRanges;
     
    -    public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
    -                         boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
    -    {
    -        this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
    -             pendingRepair, requestRanges, transferRanges, previewKind);
    -    }
    +    protected final boolean requestRanges;
    --- End diff --
    
    +1, made a change 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222608651
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    --- End diff --
    
    this assert depends on the iteration order in https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java#L55 - when we have an option to stream between two hosts, we always pick the one with the fewest outgoing streams, so if by chance we have added a few outgoing streams to `addr2` before this, we might sync `range2` from `addr3` instead
    
    we should probably assert that `addr1` gets `range2` from either `addr2` or `addr3` (but not both) instead


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222952614
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    --- End diff --
    
    +1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222609036
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
    --- End diff --
    
    as above - `addr3` could stream `range1` from either `addr1` or `addr2`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222953530
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    --- End diff --
    
    +1, good point


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on the issue:

    https://github.com/apache/cassandra/pull/276
  
    I've fixed your suggestions @krummas, should I commit?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222954234
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    --- End diff --
    
    This invariant does not hold in the current code: `range2` _is_ streamed from both `addr1` and `addr2`, assert below checks for that. This doesn't look right, but I'd have to dig deeper to understand why.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222552355
  
    --- Diff: src/java/org/apache/cassandra/repair/LocalSyncTask.java ---
    @@ -52,15 +52,9 @@
         private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
     
         private final UUID pendingRepair;
    -    private final boolean requestRanges;
    -    private final boolean transferRanges;
     
    -    public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
    -                         boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
    -    {
    -        this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
    -             pendingRepair, requestRanges, transferRanges, previewKind);
    -    }
    +    protected final boolean requestRanges;
    --- End diff --
    
    these could be VisibleForTesting (and perhaps package private)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222600025
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    --- End diff --
    
    this should probably be tested with `pullRepair = false` - otherwise the assert below passes with only `addr4` transient


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222952584
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    --- End diff --
    
    Thank you for spotting 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222953487
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    --- End diff --
    
    +1, addressed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen closed the pull request at:

    https://github.com/apache/cassandra/pull/276


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222566547
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    --- End diff --
    
    s/Stanard/Standard/ (in the whole file)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222610027
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync);
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            ep -> ep.equals(addr3),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        assertTrue(task.isLocal());
    +        assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +        assertTrue(((LocalSyncTask)task).requestRanges);
    +        assertFalse(((LocalSyncTask)task).transferRanges);
    +
    +        task = tasks.get(pair(addr2, addr1));
    +        assertFalse(task.isLocal());
    +        assertElementEquals(Arrays.asList(range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    --- End diff --
    
    and same as above


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222609899
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync);
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr3, addr2)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync);
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            ep -> ep.equals(addr3),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        assertTrue(task.isLocal());
    +        assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +        assertTrue(((LocalSyncTask)task).requestRanges);
    +        assertFalse(((LocalSyncTask)task).transferRanges);
    +
    +        task = tasks.get(pair(addr2, addr1));
    --- End diff --
    
    as above, `addr2` could stream `range3` from either `addr1` or `addr3`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by ifesdjeen <gi...@git.apache.org>.
Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r223013279
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    +                                       pair.coordinator,
    +                                       pair.peer),
    +                         (!pair.coordinator.equals(addr1) && !pair.peer.equals(addr1)) &&
    +                         (isTransient.test(pair.coordinator) || isTransient.test(pair.peer)),
    +                         task instanceof AsymmetricRemoteSyncTask);
    +
    +            // All ranges to be synchronised
    +            Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransient()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, false);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +    }
    +
    +    @Test
    +    public void testLocalSyncWithTransientPullRepair()
    +    {
    +        try
    +        {
    +            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +            {
    +                FBUtilities.reset();
    +                DatabaseDescriptor.setBroadcastAddress(local.address);
    +                testLocalSyncWithTransient(local, true);
    +            }
    +        }
    +        finally
    +        {
    +            FBUtilities.reset();
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +
    +    }
    +
    +    public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertEquals(9, tasks.size());
    +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 })
    +        {
    +            if (local.equals(addr))
    +                continue;
    +
    +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
    +            assertTrue(task.requestRanges);
    +            assertEquals(!pullRepair, task.transferRanges);
    +        }
    +
    +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +
    +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
    +        assertTrue(task.requestRanges);
    +        assertFalse(task.transferRanges);
    +    }
    +
    +    @Test
    +    public void testLocalAndRemoteTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"),
    +                                                         treeResponse(addr2, range1, "two", range2, "two", range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four", range2, "four", range3, "four"),
    +                                                         treeResponse(addr5, range1, "five", range2, "five", range3, "five"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr4, // local
    +                                                                                    ep -> ep.equals(addr4) || ep.equals(addr5), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        assertNull(tasks.get(pair(addr4, addr5)));
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr1, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
    +                                                     pair(addr1, addr3),
    +                                                     pair(addr2, addr1),
    +                                                     pair(addr2, addr3),
    +                                                     pair(addr3, addr1),
    +                                                     pair(addr3, addr2) })
    +        {
    +            assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync);
    +        }
    +    }
    +
    +    @Test
    +    public void testOptimizedCreateStandardSyncTasks()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one"),
    +                                                         treeResponse(addr2, range1, "one",   range2, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "two"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
    +                                                                                            treeResponses,
    +                                                                                            addr4, // local
    +                                                                                            noTransient(),
    +                                                                                            addr -> "DC1",
    +                                                                                            false,
    +                                                                                            PreviewKind.ALL));
    +
    +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync);
    +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr1, addr2)).rangesToSync);
    --- End diff --
    
    added a special assertion 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #276: Repair job tests

Posted by krummas <gi...@git.apache.org>.
Github user krummas commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/276#discussion_r222596795
  
    --- Diff: test/unit/org/apache/cassandra/repair/RepairJobTest.java ---
    @@ -0,0 +1,569 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.repair;
    +
    +import java.net.UnknownHostException;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.function.Predicate;
    +
    +import com.google.common.collect.Sets;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.dht.ByteOrderedPartitioner;
    +import org.apache.cassandra.dht.IPartitioner;
    +import org.apache.cassandra.dht.Range;
    +import org.apache.cassandra.dht.Token;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.streaming.PreviewKind;
    +import org.apache.cassandra.utils.ByteBufferUtil;
    +import org.apache.cassandra.utils.FBUtilities;
    +import org.apache.cassandra.utils.MerkleTree;
    +import org.apache.cassandra.utils.MerkleTrees;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +public class RepairJobTest
    +{
    +    private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance;
    +
    +    static InetAddressAndPort addr1;
    +    static InetAddressAndPort addr2;
    +    static InetAddressAndPort addr3;
    +    static InetAddressAndPort addr4;
    +    static InetAddressAndPort addr5;
    +
    +    static Range<Token> range1 = range(0, 1);
    +    static Range<Token> range2 = range(2, 3);
    +    static Range<Token> range3 = range(4, 5);
    +    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList());
    +
    +    @AfterClass
    +    public static void reset()
    +    {
    +        FBUtilities.reset();
    +    }
    +
    +    static
    +    {
    +        try
    +        {
    +            addr1 = InetAddressAndPort.getByName("127.0.0.1");
    +            addr2 = InetAddressAndPort.getByName("127.0.0.2");
    +            addr3 = InetAddressAndPort.getByName("127.0.0.3");
    +            addr4 = InetAddressAndPort.getByName("127.0.0.4");
    +            addr5 = InetAddressAndPort.getByName("127.0.0.5");
    +            DatabaseDescriptor.setBroadcastAddress(addr1.address);
    +        }
    +        catch (UnknownHostException e)
    +        {
    +            e.printStackTrace();
    +        }
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasks()
    +    {
    +        testCreateStandardSyncTasks(false);
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksPullRepair()
    +    {
    +        testCreateStandardSyncTasks(true);
    +    }
    +
    +    public static void testCreateStandardSyncTasks(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same",      range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"),
    +                                                         treeResponse(addr3, range1, "same",      range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    noTransient(), // transient
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(2, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +
    +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
    +    }
    +
    +    @Test
    +    public void testStanardSyncTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncTransient(true);
    +        testStanardSyncTransient(false);
    +    }
    +
    +    public void testStanardSyncTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr2),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
    +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testStanardSyncLocalTransient()
    +    {
    +        // Do not stream towards transient nodes
    +        testStanardSyncLocalTransient(true);
    +        testStanardSyncLocalTransient(false);
    +    }
    +
    +    public void testStanardSyncLocalTransient(boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "different", range2, "same", range3, "different"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    transientPredicate(addr1),
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        if (pullRepair)
    +        {
    +            Assert.assertTrue(tasks.isEmpty());
    +            return;
    +        }
    +
    +        Assert.assertEquals(1, tasks.size());
    +
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
    +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
    +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testEmptyDifference()
    +    {
    +        // one of the nodes is a local coordinator
    +        testEmptyDifference(addr1, noTransient(), true);
    +        testEmptyDifference(addr1, noTransient(), false);
    +        testEmptyDifference(addr2, noTransient(), true);
    +        testEmptyDifference(addr2, noTransient(), false);
    +        testEmptyDifference(addr1, transientPredicate(addr1), true);
    +        testEmptyDifference(addr2, transientPredicate(addr1), true);
    +        testEmptyDifference(addr1, transientPredicate(addr1), false);
    +        testEmptyDifference(addr2, transientPredicate(addr1), false);
    +        testEmptyDifference(addr1, transientPredicate(addr2), true);
    +        testEmptyDifference(addr2, transientPredicate(addr2), true);
    +        testEmptyDifference(addr1, transientPredicate(addr2), false);
    +        testEmptyDifference(addr2, transientPredicate(addr2), false);
    +
    +        // nonlocal coordinator
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, noTransient(), true);
    +        testEmptyDifference(addr3, noTransient(), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), true);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr1), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), true);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +        testEmptyDifference(addr3, transientPredicate(addr2), false);
    +    }
    +
    +    public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"),
    +                                                         treeResponse(addr2, range1, "same", range2, "same", range3, "same"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    local, // local
    +                                                                                    isTransient,
    +                                                                                    false,
    +                                                                                    pullRepair,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertTrue(tasks.isEmpty());
    +    }
    +
    +    @Test
    +    public void testCreateStandardSyncTasksAllDifferent()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"));
    +
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    ep -> ep.equals(addr3), // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        Assert.assertEquals(3, tasks.size());
    +        SyncTask task = tasks.get(pair(addr1, addr2));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr2, addr3));
    +        Assert.assertFalse(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +
    +        task = tasks.get(pair(addr1, addr3));
    +        Assert.assertTrue(task.isLocal());
    +        Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync);
    +    }
    +
    +    @Test
    +    public void testCreate5NodeStandardSyncTasksWithTransient()
    +    {
    +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one",   range2, "one",   range3, "one"),
    +                                                         treeResponse(addr2, range1, "two",   range2, "two",   range3, "two"),
    +                                                         treeResponse(addr3, range1, "three", range2, "three", range3, "three"),
    +                                                         treeResponse(addr4, range1, "four",  range2, "four",  range3, "four"),
    +                                                         treeResponse(addr5, range1, "five",  range2, "five",  range3, "five"));
    +
    +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5);
    +        Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc,
    +                                                                                    treeResponses,
    +                                                                                    addr1, // local
    +                                                                                    isTransient, // transient
    +                                                                                    false,
    +                                                                                    true,
    +                                                                                    PreviewKind.ALL));
    +
    +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
    +                                                   pair(addr1, addr3),
    +                                                   pair(addr1, addr4),
    +                                                   pair(addr1, addr5),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr4),
    +                                                   pair(addr2, addr5),
    +                                                   pair(addr3, addr4),
    +                                                   pair(addr3, addr5)};
    +
    +        for (SyncNodePair pair : pairs)
    +        {
    +            SyncTask task = tasks.get(pair);
    +            // Local only if addr1 is a coordinator
    +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
    +
    +            // Symmetric only if there are no transient participants
    +            assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",
    --- End diff --
    
    maybe make this something like:
    ```
    boolean isRemote = !pair.coordinator.equals(addr1) && !pair.peer.equals(addr1);
    boolean involvesTransient = isTransient.test(pair.coordinator) || isTransient.test(pair.peer);
    assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",pair.coordinator, pair.peer),
                             isRemote && involvesTransient,
                             task instanceof AsymmetricRemoteSyncTask);
    ```
    to make it a bit more readable?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org