You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/11 20:58:50 UTC

[1/2] cassandra git commit: Allow transient node to serve as a repair coordinator

Repository: cassandra
Updated Branches:
  refs/heads/trunk 2886cac38 -> 0841353e9


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
new file mode 100644
index 0000000..6e691f5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LocalSyncTaskTest extends AbstractRepairTest
+{
+    private static final IPartitioner partitioner = Murmur3Partitioner.instance;
+    private static final InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+    public static final String KEYSPACE1 = "DifferencerTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
+
+        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+    }
+
+    /**
+     * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference.
+     */
+    @Test
+    public void testNoDifference() throws Throwable
+    {
+        final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.2");
+
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(local, tree1);
+        TreeResponse r2 = new TreeResponse(ep2, tree2);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
+        task.run();
+
+        assertEquals(0, task.get().numberOfDifferences);
+    }
+
+    @Test
+    public void testDifference() throws Throwable
+    {
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        UUID parentRepairSession = UUID.randomUUID();
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
+                                                                 Arrays.asList(cfs), Arrays.asList(range), false,
+                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false,
+                                                                 PreviewKind.NONE);
+
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
+        MerkleTrees tree2 = createInitialTree(desc);
+
+        // change a range in one of the trees
+        Token token = partitioner.midpoint(range.left, range.right);
+        tree1.invalidate(token);
+        MerkleTree.TreeRange changed = tree1.get(token);
+        changed.hash("non-empty hash!".getBytes());
+
+        Set<Range<Token>> interesting = new HashSet<>();
+        interesting.add(changed);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(local, tree1);
+        TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
+        DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
+        DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
+        try
+        {
+            task.run();
+        }
+        finally
+        {
+            DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
+            DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3;
+        }
+
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting.size(), task.stat.numberOfDifferences);
+    }
+
+    @Test
+    public void fullRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1));
+
+        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
+        assertTrue(plan.getFlushBeforeTransfer());
+    }
+
+    private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing)
+    {
+        StreamCoordinator coordinator = plan.getCoordinator();
+        StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions());
+        assertEquals(expectedIncoming, session.getNumRequests());
+        assertEquals(expectedOutgoing, session.getNumTransfers());
+    }
+
+    @Test
+    public void incrementalRepairStreamPlan() throws Exception
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1));
+
+        assertEquals(desc.parentSessionId, plan.getPendingRepair());
+        assertFalse(plan.getFlushBeforeTransfer());
+        assertNumInOut(plan, 1, 1);
+    }
+
+    /**
+     * Don't reciprocate streams if the other endpoint is a transient replica
+     */
+    @Test
+    public void transientRemoteStreamPlan()
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1));
+        assertNumInOut(plan, 1, 0);
+    }
+
+    /**
+     * Don't request streams if the other endpoint is a transient replica
+     */
+    @Test
+    public void transientLocalStreamPlan()
+    {
+        UUID sessionID = registerSession(cfs, true, true);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
+        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
+
+        TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
+
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1));
+        assertNumInOut(plan, 0, 1);
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
+    {
+        MerkleTrees tree = new MerkleTrees(partitioner);
+        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.ensureHashInitialised();
+        }
+        return tree;
+    }
+
+    private MerkleTrees createInitialTree(RepairJobDesc desc)
+    {
+        return createInitialTree(desc, partitioner);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
deleted file mode 100644
index 92ae172..0000000
--- a/test/unit/org/apache/cassandra/repair/SymmetricLocalSyncTaskTest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.streaming.StreamCoordinator;
-import org.apache.cassandra.streaming.DefaultConnectionFactory;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SymmetricLocalSyncTaskTest extends AbstractRepairTest
-{
-    private static final IPartitioner partitioner = Murmur3Partitioner.instance;
-    public static final String KEYSPACE1 = "DifferencerTest";
-    public static final String CF_STANDARD = "Standard1";
-    public static ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void defineSchema()
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
-
-        TableId tid = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD).id;
-        cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
-    }
-
-    /**
-     * When there is no difference between two, SymmetricLocalSyncTask should return stats with 0 difference.
-     */
-    @Test
-    public void testNoDifference() throws Throwable
-    {
-        final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
-        final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
-
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine here
-        TreeResponse r1 = new TreeResponse(ep1, tree1);
-        TreeResponse r2 = new TreeResponse(ep2, tree2);
-        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        task.run();
-
-        assertEquals(0, task.get().numberOfDifferences);
-    }
-
-    @Test
-    public void testDifference() throws Throwable
-    {
-        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        UUID parentRepairSession = UUID.randomUUID();
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
-                                                                 Arrays.asList(cfs), Arrays.asList(range), false,
-                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false,
-                                                                 PreviewKind.NONE);
-
-        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
-
-        MerkleTrees tree1 = createInitialTree(desc);
-
-        MerkleTrees tree2 = createInitialTree(desc);
-
-        // change a range in one of the trees
-        Token token = partitioner.midpoint(range.left, range.right);
-        tree1.invalidate(token);
-        MerkleTree.TreeRange changed = tree1.get(token);
-        changed.hash("non-empty hash!".getBytes());
-
-        Set<Range<Token>> interesting = new HashSet<>();
-        interesting.add(changed);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine here
-        TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
-        TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
-        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
-        DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
-        try
-        {
-            task.run();
-        }
-        finally
-        {
-            DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
-            DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 3;
-        }
-
-        // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
-    }
-
-    @Test
-    public void fullRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, NO_PENDING_REPAIR, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
-        assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
-        assertTrue(plan.getFlushBeforeTransfer());
-    }
-
-    private static void assertNumInOut(StreamPlan plan, int expectedIncoming, int expectedOutgoing)
-    {
-        StreamCoordinator coordinator = plan.getCoordinator();
-        StreamSession session = Iterables.getOnlyElement(coordinator.getAllStreamSessions());
-        assertEquals(expectedIncoming, session.getNumRequests());
-        assertEquals(expectedOutgoing, session.getNumTransfers());
-    }
-
-    @Test
-    public void incrementalRepairStreamPlan() throws Exception
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, false, desc.parentSessionId, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1));
-
-        assertEquals(desc.parentSessionId, plan.getPendingRepair());
-        assertFalse(plan.getFlushBeforeTransfer());
-        assertNumInOut(plan, 1, 1);
-    }
-
-    /**
-     * Don't reciprocate streams if the other endpoint is a transient replica
-     */
-    @Test
-    public void transientStreamPlan()
-    {
-        UUID sessionID = registerSession(cfs, true, true);
-        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
-        RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), KEYSPACE1, CF_STANDARD, prs.getRanges());
-
-        TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-        TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
-
-        SymmetricLocalSyncTask task = new SymmetricLocalSyncTask(desc, r1, r2, true, desc.parentSessionId, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(PARTICIPANT2, Lists.newArrayList(RANGE1));
-        assertNumInOut(plan, 1, 0);
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc, IPartitioner partitioner)
-    {
-        MerkleTrees tree = new MerkleTrees(partitioner);
-        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
-        tree.init();
-        for (MerkleTree.TreeRange r : tree.invalids())
-        {
-            r.ensureHashInitialised();
-        }
-        return tree;
-    }
-
-    private MerkleTrees createInitialTree(RepairJobDesc desc)
-    {
-        return createInitialTree(desc, partitioner);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
index 06f968f..7f48788 100644
--- a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -30,18 +30,18 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.UUIDGen;
 
 public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
 {
     private static final RepairJobDesc DESC = new RepairJobDesc(UUIDGen.getTimeUUID(), UUIDGen.getTimeUUID(), "ks", "tbl", ALL_RANGES);
     private static final List<Range<Token>> RANGE_LIST = ImmutableList.of(RANGE1);
-
     private static class InstrumentedSymmetricRemoteSyncTask extends SymmetricRemoteSyncTask
     {
         public InstrumentedSymmetricRemoteSyncTask(InetAddressAndPort e1, InetAddressAndPort e2)
         {
-            super(DESC, new TreeResponse(e1, null), new TreeResponse(e2, null), PreviewKind.NONE);
+            super(DESC, e1, e2, RANGE_LIST, PreviewKind.NONE);
         }
 
         RepairMessage sentMessage = null;
@@ -62,7 +62,7 @@ public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
     public void normalSync()
     {
         InstrumentedSymmetricRemoteSyncTask syncTask = new InstrumentedSymmetricRemoteSyncTask(PARTICIPANT1, PARTICIPANT2);
-        syncTask.startSync(RANGE_LIST);
+        syncTask.startSync();
 
         Assert.assertNotNull(syncTask.sentMessage);
         Assert.assertSame(SyncRequest.class, syncTask.sentMessage.getClass());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index e57ab94..d583d85 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -29,9 +29,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -45,7 +43,7 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.SyncNodePair;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.schema.TableId;
@@ -166,7 +164,7 @@ public class RepairMessageSerializationsTest
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)),
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10))
         ));
-        SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true, summaries);
+        SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries);
         serializeRoundTrip(msg, SyncComplete.serializer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index c29e7a8..1223683 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.SyncNodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.repair.messages.*;
@@ -239,7 +239,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         InetAddressAndPort src = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", PORT);
         InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT);
-        NodePair nodes = new NodePair(src, dest);
+        SyncNodePair nodes = new SyncNodePair(src, dest);
 
         try (DataInputStreamPlus in = getInput("service.SyncComplete.bin"))
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Allow transient node to serve as a repair coordinator

Posted by if...@apache.org.
Allow transient node to serve as a repair coordinator

Patch by Alex Petrov and Blake Eggleston, reviewed by Ariel Weisberg, Blake Eggleston, Marcus Eriksson for CASSANDRA-14693

Co-authored-by: Blake Eggleston <bd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0841353e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0841353e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0841353e

Branch: refs/heads/trunk
Commit: 0841353e90f1cc94dc47b435af87e4d5876478ea
Parents: 2886cac
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Sep 4 19:38:27 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Sep 11 22:58:01 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/DiskBoundaryManager.java       |  21 +-
 .../cassandra/repair/AbstractSyncTask.java      |  31 ---
 .../repair/AsymmetricLocalSyncTask.java         | 105 --------
 .../repair/AsymmetricRemoteSyncTask.java        |  19 +-
 .../cassandra/repair/AsymmetricSyncTask.java    |  81 ------
 .../apache/cassandra/repair/CommonRange.java    |   8 +-
 .../apache/cassandra/repair/LocalSyncTask.java  | 158 ++++++++++++
 .../org/apache/cassandra/repair/NodePair.java   |  91 -------
 .../org/apache/cassandra/repair/RepairJob.java  | 174 +++++++------
 .../apache/cassandra/repair/RepairRunnable.java |  27 +-
 .../apache/cassandra/repair/RepairSession.java  |  16 +-
 .../repair/SymmetricLocalSyncTask.java          | 142 -----------
 .../repair/SymmetricRemoteSyncTask.java         |  22 +-
 .../cassandra/repair/SymmetricSyncTask.java     |  94 -------
 .../apache/cassandra/repair/SyncNodePair.java   |  91 +++++++
 .../org/apache/cassandra/repair/SyncStat.java   |   6 +-
 .../org/apache/cassandra/repair/SyncTask.java   |  96 +++++++
 .../cassandra/repair/messages/SyncComplete.java |  14 +-
 .../cassandra/service/ActiveRepairService.java  |   3 -
 .../service/reads/AbstractReadExecutor.java     |   2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     | 249 +++++++++++++++++++
 .../repair/SymmetricLocalSyncTaskTest.java      | 232 -----------------
 .../repair/SymmetricRemoteSyncTaskTest.java     |   6 +-
 .../RepairMessageSerializationsTest.java        |   6 +-
 .../cassandra/service/SerializationsTest.java   |   4 +-
 26 files changed, 766 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b7bc775..ef285e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
  * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
  * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
  * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index acfe71a..0961a42 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -109,17 +109,7 @@ public class DiskBoundaryManager
         if (localRanges == null || localRanges.isEmpty())
             return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
 
-        // note that Range.sort unwraps any wraparound ranges, so we need to sort them here
-        List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream()
-                                                                   .filter(Replica::isFull)
-                                                                   .map(Replica::range)
-                                                                   .collect(Collectors.toList()));
-        List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream()
-                                                                        .filter(Replica::isTransient)
-                                                                        .map(Replica::range)
-                                                                        .collect(Collectors.toList()));
-
-        List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs);
+        List<PartitionPosition> positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs);
 
         return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
     }
@@ -133,18 +123,19 @@ public class DiskBoundaryManager
      *
      * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
      */
-    private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint ranges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
     {
         assert partitioner.splitter().isPresent();
 
         Splitter splitter = partitioner.splitter().get();
         boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
 
-        List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size());
-        for (Range<Token> r : fullRanges)
+        List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(ranges.size());
+        // note that Range.sort unwraps any wraparound ranges, so we need to sort them here
+        for (Range<Token> r : Range.sort(ranges.fullRanges()))
             weightedRanges.add(new Splitter.WeightedRange(1.0, r));
 
-        for (Range<Token> r : transientRanges)
+        for (Range<Token> r : Range.sort(ranges.transientRanges()))
             weightedRanges.add(new Splitter.WeightedRange(0.1, r));
 
         weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
deleted file mode 100644
index 124baa1..0000000
--- a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.List;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-
-public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable
-{
-    protected abstract void startSync(List<Range<Token>> rangesToStream);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
deleted file mode 100644
index eaf890a..0000000
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler
-{
-    private final UUID pendingRepair;
-    private final TraceState state = Tracing.instance.get();
-
-    public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
-    {
-        super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind);
-        this.pendingRepair = pendingRepair;
-    }
-
-    public void startSync(List<Range<Token>> rangesToFetch)
-    {
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
-                                         1, false,
-                                         pendingRepair,
-                                         previewKind)
-                          .listeners(this)
-                          .flushBeforeTransfer(pendingRepair == null)
-                          // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
-                          .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch),
-                                  RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
-        plan.execute();
-
-    }
-
-    public void handleStreamEvent(StreamEvent event)
-    {
-        if (state == null)
-            return;
-        switch (event.eventType)
-        {
-            case STREAM_PREPARED:
-                StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
-                state.trace("Streaming session with {} prepared", spe.session.peer);
-                break;
-            case STREAM_COMPLETE:
-                StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
-                state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
-                break;
-            case FILE_PROGRESS:
-                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
-                state.trace("{}/{} ({}%) {} idx:{}{}",
-                            new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
-                                           FBUtilities.prettyPrintMemory(pi.totalBytes),
-                                           pi.currentBytes * 100 / pi.totalBytes,
-                                           pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
-                                           pi.sessionIndex,
-                                           pi.peer });
-        }
-    }
-
-    public void onSuccess(StreamState result)
-    {
-        String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily);
-        Tracing.traceRepair(message);
-        set(stat);
-        finished();
-    }
-
-    public void onFailure(Throwable t)
-    {
-        setException(t);
-        finished();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 2b171c9..9ba33dd 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -26,27 +26,36 @@ import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
 
-public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
+/**
+ * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
+ * data with other target replica.
+ *
+ * When AsymmetricRemoteSyncTask receives SyncComplete from the target, task completes.
+ */
+public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
 {
-    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom,
+                                    List<Range<Token>> rangesToFetch, PreviewKind previewKind)
     {
         super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
     }
+
     public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind)
     {
         this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind);
     }
 
-    public void startSync(List<Range<Token>> rangesToFetch)
+    public void startSync()
     {
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
+        AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
         Tracing.traceRepair(message);
         MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
@@ -60,7 +69,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
         }
         else
         {
-            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom)));
+            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer)));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
deleted file mode 100644
index 35474af..0000000
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tracing.Tracing;
-
-public abstract class AsymmetricSyncTask extends AbstractSyncTask
-{
-    private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
-    protected final RepairJobDesc desc;
-    protected final InetAddressAndPort fetchFrom;
-    protected final List<Range<Token>> rangesToFetch;
-    protected final InetAddressAndPort fetchingNode;
-    protected final PreviewKind previewKind;
-    private long startTime = Long.MIN_VALUE;
-    protected volatile SyncStat stat;
-
-    public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
-    {
-        assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom;
-        this.desc = desc;
-        this.fetchFrom = fetchFrom;
-        this.fetchingNode = fetchingNode;
-        this.rangesToFetch = rangesToFetch;
-        // todo: make an AsymmetricSyncStat?
-        stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
-        this.previewKind = previewKind;
-    }
-
-    public void run()
-    {
-        startTime = System.currentTimeMillis();
-        // choose a repair method based on the significance of the difference
-        String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily);
-        if (rangesToFetch.isEmpty())
-        {
-            logger.info(String.format(format, "are consistent"));
-            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily);
-            set(stat);
-            return;
-        }
-
-        // non-0 difference: perform streaming repair
-        logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync"));
-        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily);
-        startSync(rangesToFetch);
-    }
-
-    protected void finished()
-    {
-        if (startTime != Long.MIN_VALUE)
-            Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/CommonRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
index 928e570..6b55dc7 100644
--- a/src/java/org/apache/cassandra/repair/CommonRange.java
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -48,7 +48,13 @@ public class CommonRange
 
         this.endpoints = ImmutableSet.copyOf(endpoints);
         this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
-        this.ranges = new ArrayList(ranges);
+        this.ranges = new ArrayList<>(ranges);
+    }
+
+    public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints)
+    {
+        // Use strict equality here, as worst thing that can happen is we generate one more stream
+        return this.endpoints.equals(endpoints) && this.transEndpoints.equals(transEndpoints);
     }
 
     public boolean equals(Object o)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
new file mode 100644
index 0000000..1923fbe
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
+
+/**
+ * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
+ */
+public class LocalSyncTask extends SyncTask implements StreamEventHandler
+{
+    private final TraceState state = Tracing.instance.get();
+
+    private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
+
+    private final UUID pendingRepair;
+    private final boolean requestRanges;
+    private final boolean transferRanges;
+
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
+                         boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
+    {
+        this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
+             pendingRepair, requestRanges, transferRanges, previewKind);
+    }
+
+    public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
+                         List<Range<Token>> diff, UUID pendingRepair,
+                         boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
+    {
+        super(desc, local, remote, diff, previewKind);
+        Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job");
+        Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort()));
+
+        this.pendingRepair = pendingRepair;
+        this.requestRanges = requestRanges;
+        this.transferRanges = transferRanges;
+    }
+
+    @VisibleForTesting
+    StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> differences)
+    {
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
+                          .listeners(this)
+                          .flushBeforeTransfer(pendingRepair == null);
+
+        if (requestRanges)
+        {
+            // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+            plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
+                               RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
+        }
+
+        if (transferRanges)
+        {
+            // send ranges to the remote node if we are not performing a pull repair
+            // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+            plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
+        }
+
+        return plan;
+    }
+
+    /**
+     * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+     * that will be called out of band once the streams complete.
+     */
+    @Override
+    protected void startSync()
+    {
+        InetAddressAndPort remote = nodePair.peer;
+
+        String message = String.format("Performing streaming repair of %d ranges with %s", rangesToSync.size(), remote);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        Tracing.traceRepair(message);
+
+        createStreamPlan(remote, rangesToSync).execute();
+    }
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (state == null)
+            return;
+        switch (event.eventType)
+        {
+            case STREAM_PREPARED:
+                StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
+                state.trace("Streaming session with {} prepared", spe.session.peer);
+                break;
+            case STREAM_COMPLETE:
+                StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
+                state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
+                break;
+            case FILE_PROGRESS:
+                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+                state.trace("{}/{} ({}%) {} idx:{}{}",
+                            new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
+                                           FBUtilities.prettyPrintMemory(pi.totalBytes),
+                                           pi.currentBytes * 100 / pi.totalBytes,
+                                           pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
+                                           pi.sessionIndex,
+                                           pi.peer });
+        }
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        Tracing.traceRepair(message);
+        set(stat.withSummaries(result.createSummaries()));
+        finished();
+    }
+
+    public void onFailure(Throwable t)
+    {
+        setException(t);
+        finished();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
deleted file mode 100644
index bfb237e..0000000
--- a/src/java/org/apache/cassandra/repair/NodePair.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.io.IOException;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-
-/**
- * NodePair is used for repair message body to indicate the pair of nodes.
- *
- * @since 2.0
- */
-public class NodePair
-{
-    public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
-
-    public final InetAddressAndPort endpoint1;
-    public final InetAddressAndPort endpoint2;
-
-    public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
-    {
-        this.endpoint1 = endpoint1;
-        this.endpoint2 = endpoint2;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        NodePair nodePair = (NodePair) o;
-        return endpoint1.equals(nodePair.endpoint1) && endpoint2.equals(nodePair.endpoint2);
-    }
-
-    @Override
-    public String toString()
-    {
-        return endpoint1.toString() + " - " + endpoint2.toString();
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hashCode(endpoint1, endpoint2);
-    }
-
-    public static class NodePairSerializer implements IVersionedSerializer<NodePair>
-    {
-        public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException
-        {
-            CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version);
-            CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version);
-        }
-
-        public NodePair deserialize(DataInputPlus in, int version) throws IOException
-        {
-            InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
-            InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
-            return new NodePair(ep1, ep2);
-        }
-
-        public long serializedSize(NodePair nodePair, int version)
-        {
-            return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version)
-                 + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index d38435b..c96e7fb 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.apache.cassandra.utils.Pair;
  */
 public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
 {
-    private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
+    private static final Logger logger = LoggerFactory.getLogger(RepairJob.class);
 
     private final RepairSession session;
     private final RepairJobDesc desc;
@@ -128,7 +129,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         }
 
         // When all validations complete, submit sync tasks
-        ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor);
+        ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations,
+                                                                              optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing,
+                                                                              taskExecutor);
 
         // When all sync complete, set the final result
         Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
@@ -165,107 +168,116 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         return session.commonRange.transEndpoints.contains(ep);
     }
 
-    private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
+    private ListenableFuture<List<SyncStat>> standardSyncing(List<TreeResponse> trees)
     {
-        return trees ->
-        {
-            InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+        InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
-            List<AbstractSyncTask> syncTasks = new ArrayList<>();
-            // We need to difference all trees one against another
-            for (int i = 0; i < trees.size() - 1; ++i)
+        List<SyncTask> syncTasks = new ArrayList<>();
+        // We need to difference all trees one against another
+        for (int i = 0; i < trees.size() - 1; ++i)
+        {
+            TreeResponse r1 = trees.get(i);
+            for (int j = i + 1; j < trees.size(); ++j)
             {
-                TreeResponse r1 = trees.get(i);
-                for (int j = i + 1; j < trees.size(); ++j)
+                TreeResponse r2 = trees.get(j);
+
+                // Avoid streming between two tansient replicas
+                if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+                    continue;
+
+                SyncTask task;
+                if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
                 {
-                    TreeResponse r2 = trees.get(j);
+                    TreeResponse self = r1.endpoint.equals(local) ? r1 : r2;
+                    TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2;
 
-                    if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+                    // pull only if local is full
+                    boolean requestRanges = !isTransient(self.endpoint);
+                    // push only if remote is full; additionally check for pull repair
+                    boolean transferRanges = !isTransient(remote.endpoint) && !session.pullRepair;
+
+                    // Nothing to do
+                    if (!requestRanges && !transferRanges)
                         continue;
 
-                    AbstractSyncTask task;
-                    if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
-                    {
-                        InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
-                        task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
-                    }
-                    else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
-                    {
-                        TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
-                        TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1;
-                        task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
-                        session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task);
-                    }
-                    else
-                    {
-                        task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
-                        // SymmetricRemoteSyncTask expects SyncComplete message sent back.
-                        // Register task to RepairSession to receive response.
-                        session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task);
-                    }
-                    syncTasks.add(task);
-                    taskExecutor.submit(task);
+                    task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null,
+                                             requestRanges, transferRanges, session.previewKind);
+                }
+                else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
+                {
+                    // Stream only from transient replica
+                    TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
+                    TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1;
+                    task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
+                    session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task);
+                }
+                else
+                {
+                    task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
+                    // SymmetricRemoteSyncTask expects SyncComplete message sent back.
+                    // Register task to RepairSession to receive response.
+                    session.waitForSync(Pair.create(desc, task.nodePair()), (SymmetricRemoteSyncTask) task);
                 }
+                syncTasks.add(task);
+                taskExecutor.submit(task);
             }
-            return Futures.allAsList(syncTasks);
-        };
+        }
+        return Futures.allAsList(syncTasks);
     }
 
-    private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing()
+    private ListenableFuture<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees)
     {
-        return trees ->
-        {
-            InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+        InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
-            List<AbstractSyncTask> syncTasks = new ArrayList<>();
-            // We need to difference all trees one against another
-            DifferenceHolder diffHolder = new DifferenceHolder(trees);
+        List<SyncTask> syncTasks = new ArrayList<>();
+        // We need to difference all trees one against another
+        DifferenceHolder diffHolder = new DifferenceHolder(trees);
 
-            logger.debug("diffs = {}", diffHolder);
-            PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
-                                                    candidates.stream()
-                                                              .filter(node -> getDC(streaming)
-                                                                              .equals(getDC(node)))
-                                                              .collect(Collectors.toSet());
-            ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+        logger.debug("diffs = {}", diffHolder);
+        PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
+                                                candidates.stream()
+                                                          .filter(node -> getDC(streaming)
+                                                                          .equals(getDC(node)))
+                                                          .collect(Collectors.toSet());
+        ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
 
-            for (int i = 0; i < trees.size(); i++)
-            {
-                InetAddressAndPort address = trees.get(i).endpoint;
+        for (int i = 0; i < trees.size(); i++)
+        {
+            InetAddressAndPort address = trees.get(i).endpoint;
 
-                // we don't stream to transient replicas
-                if (isTransient(address))
-                    continue;
+            // we don't stream to transient replicas
+            if (isTransient(address))
+                continue;
 
-                HostDifferences streamsFor = reducedDifferences.get(address);
-                if (streamsFor != null)
+            HostDifferences streamsFor = reducedDifferences.get(address);
+            if (streamsFor != null)
+            {
+                Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
+                for (InetAddressAndPort fetchFrom : streamsFor.hosts())
                 {
-                    assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
-                    for (InetAddressAndPort fetchFrom : streamsFor.hosts())
+                    List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+                    logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
+                    SyncTask task;
+                    if (address.equals(local))
                     {
-                        List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
-                        logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
-                        AsymmetricSyncTask task;
-                        if (address.equals(local))
-                        {
-                            task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind);
-                        }
-                        else
-                        {
-                            task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
-                            session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task);
-                        }
-                        syncTasks.add(task);
-                        taskExecutor.submit(task);
+                        task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null,
+                                                 true, false, session.previewKind);
                     }
-                }
-                else
-                {
-                    logger.debug("Node {} has nothing to stream", address);
+                    else
+                    {
+                        task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
+                        session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task);
+                    }
+                    syncTasks.add(task);
+                    taskExecutor.submit(task);
                 }
             }
-            return Futures.allAsList(syncTasks);
-        };
+            else
+            {
+                logger.debug("Node {} has nothing to stream", address);
+            }
+        }
+        return Futures.allAsList(syncTasks);
     }
 
     private String getDC(InetAddressAndPort address)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 8d3cd54..fa0c2a9 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -196,21 +196,17 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         Set<InetAddressAndPort> allNeighbors = new HashSet<>();
         List<CommonRange> commonRanges = new ArrayList<>();
 
-        //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
-        //calculation multiple times
-        // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica
-        Iterable<Range<Token>> keyspaceLocalRanges = storageService
-                .getLocalReplicas(keyspace)
-                .filter(Replica::isFull)
-                .ranges();
-
         try
         {
+            //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
+            //calculation multiple times
+            Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
+
             for (Range<Token> range : options.getRanges())
             {
                 EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
-                                                                        options.getDataCenters(),
-                                                                        options.getHosts());
+                                                                               options.getDataCenters(),
+                                                                               options.getHosts());
 
                 addRangeToNeighbors(commonRanges, range, neighbors);
                 allNeighbors.addAll(neighbors.endpoints());
@@ -647,17 +643,16 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                ImmutableList.of(failureMessage, completionMessage));
     }
 
-    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
+    private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
     {
         Set<InetAddressAndPort> endpoints = neighbors.endpoints();
         Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
-        for (int i = 0; i < neighborRangeList.size(); i++)
-        {
-            CommonRange cr = neighborRangeList.get(i);
 
-            if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints))
+        for (CommonRange commonRange : neighborRangeList)
+        {
+            if (commonRange.matchesEndpoints(endpoints, transEndpoints))
             {
-                cr.ranges.add(range);
+                commonRange.ranges.add(range);
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2ff60ec..4dc563a 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -54,9 +54,8 @@ import org.apache.cassandra.utils.Pair;
  *      ({@link org.apache.cassandra.repair.ValidationTask}) and waits until all trees are received (in
  *      validationComplete()).
  *   </li>
- *   <li>Synchronization phase: once all trees are received, the job compares each tree with
- *      all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the
- *      concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned.
+ *   <li>Synchronization phase: once all trees are received, the job compares each tree with  all the others. If there is
+ *       difference between 2 trees, the differences between the 2 endpoints will be streamed with a {@link SyncTask}.
  *   </li>
  * </ol>
  * The job is done once all its SyncTasks are done (i.e. have either computed no differences
@@ -103,7 +102,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
     private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
     // Remote syncing jobs wait response in syncingTasks map
-    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
     public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
@@ -195,12 +194,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         validating.put(key, task);
     }
 
-    public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task)
+    public void waitForSync(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task)
     {
         syncingTasks.put(key, task);
     }
 
-
     /**
      * Receive merkle tree response or failed response from {@code endpoint} for current repair job.
      *
@@ -224,13 +222,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     }
 
     /**
-     * Notify this session that sync completed/failed with given {@code NodePair}.
+     * Notify this session that sync completed/failed with given {@code SyncNodePair}.
      *
      * @param desc synced repair job
      * @param nodes nodes that completed sync
      * @param success true if sync succeeded
      */
-    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
+    public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
     {
         CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
         if (task == null)
@@ -240,7 +238,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         }
 
         if (logger.isDebugEnabled())
-            logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.endpoint1, nodes.endpoint2, desc.columnFamily);
+            logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily);
         task.syncComplete(success, summaries);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
deleted file mode 100644
index 7eedab7..0000000
--- a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica.
- */
-public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler
-{
-    private final TraceState state = Tracing.instance.get();
-
-    private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class);
-
-    private final boolean remoteIsTransient;
-    private final UUID pendingRepair;
-    private final boolean pullRepair;
-
-    public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
-    {
-        super(desc, r1, r2, previewKind);
-        this.remoteIsTransient = remoteIsTransient;
-        this.pendingRepair = pendingRepair;
-        this.pullRepair = pullRepair;
-    }
-
-    @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences)
-    {
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
-                          .listeners(this)
-                          .flushBeforeTransfer(pendingRepair == null)
-                          // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
-                          .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
-                                  RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);  // request ranges from the remote node
-
-        if (!pullRepair && !remoteIsTransient)
-        {
-            // send ranges to the remote node if we are not performing a pull repair
-            // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
-            plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
-        }
-
-        return plan;
-    }
-
-    /**
-     * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
-     * that will be called out of band once the streams complete.
-     */
-    @Override
-    protected void startSync(List<Range<Token>> differences)
-    {
-        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
-        InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
-
-        String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
-        Tracing.traceRepair(message);
-
-        createStreamPlan(dst, differences).execute();
-    }
-
-    public void handleStreamEvent(StreamEvent event)
-    {
-        if (state == null)
-            return;
-        switch (event.eventType)
-        {
-            case STREAM_PREPARED:
-                StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
-                state.trace("Streaming session with {} prepared", spe.session.peer);
-                break;
-            case STREAM_COMPLETE:
-                StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
-                state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
-                break;
-            case FILE_PROGRESS:
-                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
-                state.trace("{}/{} ({}%) {} idx:{}{}",
-                            new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
-                                           FBUtilities.prettyPrintMemory(pi.totalBytes),
-                                           pi.currentBytes * 100 / pi.totalBytes,
-                                           pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
-                                           pi.sessionIndex,
-                                           pi.peer });
-        }
-    }
-
-    public void onSuccess(StreamState result)
-    {
-        String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
-        Tracing.traceRepair(message);
-        set(stat.withSummaries(result.createSummaries()));
-        finished();
-    }
-
-    public void onFailure(Throwable t)
-    {
-        setException(t);
-        finished();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index 1f2740f..4e44c15 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -18,8 +18,8 @@
 package org.apache.cassandra.repair;
 
 import java.util.List;
-import java.util.function.Predicate;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,13 +29,13 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
@@ -43,13 +43,20 @@ import org.apache.cassandra.utils.FBUtilities;
  *
  * When SymmetricRemoteSyncTask receives SyncComplete from remote node, task completes.
  */
-public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements CompletableRemoteSyncTask
+public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
 {
     private static final Logger logger = LoggerFactory.getLogger(SymmetricRemoteSyncTask.class);
 
     public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
     {
-        super(desc, r1, r2, previewKind);
+        super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind);
+    }
+
+    @VisibleForTesting
+    SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, InetAddressAndPort e2,
+                            List<Range<Token>> differences, PreviewKind previewKind)
+    {
+        super(desc, e1, e2, differences, previewKind);
     }
 
     void sendRequest(RepairMessage request, InetAddressAndPort to)
@@ -58,11 +65,12 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple
     }
 
     @Override
-    protected void startSync(List<Range<Token>> differences)
+    protected void startSync()
     {
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
+        SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
+        Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
@@ -77,7 +85,7 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple
         }
         else
         {
-            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
+            setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer)));
         }
         finished();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
deleted file mode 100644
index 3da2293..0000000
--- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTrees;
-
-/**
- * SymmetricSyncTask will calculate the difference of MerkleTree between two nodes
- * and perform necessary operation to repair replica.
- */
-public abstract class SymmetricSyncTask extends AbstractSyncTask
-{
-    private static Logger logger = LoggerFactory.getLogger(SymmetricSyncTask.class);
-
-    protected final RepairJobDesc desc;
-    protected final TreeResponse r1;
-    protected final TreeResponse r2;
-    protected final PreviewKind previewKind;
-
-    protected volatile SyncStat stat;
-    protected long startTime = Long.MIN_VALUE;
-
-    public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
-    {
-        this.desc = desc;
-        this.r1 = r1;
-        this.r2 = r2;
-        this.previewKind = previewKind;
-    }
-
-    /**
-     * Compares trees, and triggers repairs for any ranges that mismatch.
-     */
-    public void run()
-    {
-        startTime = System.currentTimeMillis();
-        // compare trees, and collect differences
-        List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees);
-
-        stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size());
-
-        // choose a repair method based on the significance of the difference
-        String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily);
-        if (differences.isEmpty())
-        {
-            logger.info(String.format(format, "are consistent"));
-            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily);
-            set(stat);
-            return;
-        }
-
-        // non-0 difference: perform streaming repair
-        logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
-        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
-        startSync(differences);
-    }
-
-    public SyncStat getCurrentStat()
-    {
-        return stat;
-    }
-
-    protected void finished()
-    {
-        if (startTime != Long.MIN_VALUE)
-            Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncNodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncNodePair.java b/src/java/org/apache/cassandra/repair/SyncNodePair.java
new file mode 100644
index 0000000..b353eb3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncNodePair.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+
+/**
+ * SyncNodePair is used for repair message body to indicate the pair of nodes.
+ *
+ * @since 2.0
+ */
+public class SyncNodePair
+{
+    public static IVersionedSerializer<SyncNodePair> serializer = new NodePairSerializer();
+
+    public final InetAddressAndPort coordinator;
+    public final InetAddressAndPort peer;
+
+    public SyncNodePair(InetAddressAndPort coordinator, InetAddressAndPort peer)
+    {
+        this.coordinator = coordinator;
+        this.peer = peer;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        SyncNodePair nodePair = (SyncNodePair) o;
+        return coordinator.equals(nodePair.coordinator) && peer.equals(nodePair.peer);
+    }
+
+    @Override
+    public String toString()
+    {
+        return coordinator.toString() + " - " + peer.toString();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(coordinator, peer);
+    }
+
+    public static class NodePairSerializer implements IVersionedSerializer<SyncNodePair>
+    {
+        public void serialize(SyncNodePair nodePair, DataOutputPlus out, int version) throws IOException
+        {
+            CompactEndpointSerializationHelper.instance.serialize(nodePair.coordinator, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(nodePair.peer, out, version);
+        }
+
+        public SyncNodePair deserialize(DataInputPlus in, int version) throws IOException
+        {
+            InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            return new SyncNodePair(ep1, ep2);
+        }
+
+        public long serializedSize(SyncNodePair nodePair, int version)
+        {
+            return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.coordinator, version)
+                 + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.peer, version);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncStat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java
index dab5659..7bb503f 100644
--- a/src/java/org/apache/cassandra/repair/SyncStat.java
+++ b/src/java/org/apache/cassandra/repair/SyncStat.java
@@ -26,16 +26,16 @@ import org.apache.cassandra.streaming.SessionSummary;
  */
 public class SyncStat
 {
-    public final NodePair nodes;
+    public final SyncNodePair nodes;
     public final long numberOfDifferences; // TODO: revert to Range<Token>
     public final List<SessionSummary> summaries;
 
-    public SyncStat(NodePair nodes, long numberOfDifferences)
+    public SyncStat(SyncNodePair nodes, long numberOfDifferences)
     {
         this(nodes, numberOfDifferences, null);
     }
 
-    public SyncStat(NodePair nodes, long numberOfDifferences, List<SessionSummary> summaries)
+    public SyncStat(SyncNodePair nodes, long numberOfDifferences, List<SessionSummary> summaries)
     {
         this.nodes = nodes;
         this.numberOfDifferences = numberOfDifferences;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java
new file mode 100644
index 0000000..ccbd26c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tracing.Tracing;
+
+public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+    private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
+
+    protected final RepairJobDesc desc;
+    protected final List<Range<Token>> rangesToSync;
+    protected final PreviewKind previewKind;
+    protected final SyncNodePair nodePair;
+
+    protected volatile long startTime = Long.MIN_VALUE;
+    protected final SyncStat stat;
+
+    protected SyncTask(RepairJobDesc desc, InetAddressAndPort primaryEndpoint, InetAddressAndPort peer, List<Range<Token>> rangesToSync, PreviewKind previewKind)
+    {
+        Preconditions.checkArgument(!peer.equals(primaryEndpoint), "Sending and receiving node are the same: %s", peer);
+        this.desc = desc;
+        this.rangesToSync = rangesToSync;
+        this.nodePair = new SyncNodePair(primaryEndpoint, peer);
+        this.previewKind = previewKind;
+        this.stat = new SyncStat(nodePair, rangesToSync.size());
+    }
+
+    protected abstract void startSync();
+
+    public SyncNodePair nodePair()
+    {
+        return nodePair;
+    }
+
+    /**
+     * Compares trees, and triggers repairs for any ranges that mismatch.
+     */
+    public final void run()
+    {
+        startTime = System.currentTimeMillis();
+
+
+        // choose a repair method based on the significance of the difference
+        String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), nodePair.coordinator, nodePair.peer, desc.columnFamily);
+        if (rangesToSync.isEmpty())
+        {
+            logger.info(String.format(format, "are consistent"));
+            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", nodePair.coordinator, nodePair.peer, desc.columnFamily);
+            set(stat);
+            return;
+        }
+
+        // non-0 difference: perform streaming repair
+        logger.info(String.format(format, "have " + rangesToSync.size() + " range(s) out of sync"));
+        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", nodePair.coordinator, rangesToSync.size(), nodePair.peer, desc.columnFamily);
+        startSync();
+    }
+
+
+    protected void finished()
+    {
+        if (startTime != Long.MIN_VALUE)
+            Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 1f1344d..c51d1fd 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.SyncNodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.streaming.SessionSummary;
 
@@ -39,13 +39,13 @@ public class SyncComplete extends RepairMessage
     public static final MessageSerializer serializer = new SyncCompleteSerializer();
 
     /** nodes that involved in this sync */
-    public final NodePair nodes;
+    public final SyncNodePair nodes;
     /** true if sync success, false otherwise */
     public final boolean success;
 
     public final List<SessionSummary> summaries;
 
-    public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
+    public SyncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
     {
         super(Type.SYNC_COMPLETE, desc);
         this.nodes = nodes;
@@ -57,7 +57,7 @@ public class SyncComplete extends RepairMessage
     {
         super(Type.SYNC_COMPLETE, desc);
         this.summaries = summaries;
-        this.nodes = new NodePair(endpoint1, endpoint2);
+        this.nodes = new SyncNodePair(endpoint1, endpoint2);
         this.success = success;
     }
 
@@ -85,7 +85,7 @@ public class SyncComplete extends RepairMessage
         public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            NodePair.serializer.serialize(message.nodes, out, version);
+            SyncNodePair.serializer.serialize(message.nodes, out, version);
             out.writeBoolean(message.success);
 
             out.writeInt(message.summaries.size());
@@ -98,7 +98,7 @@ public class SyncComplete extends RepairMessage
         public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
-            NodePair nodes = NodePair.serializer.deserialize(in, version);
+            SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version);
             boolean success = in.readBoolean();
 
             int numSummaries = in.readInt();
@@ -114,7 +114,7 @@ public class SyncComplete extends RepairMessage
         public long serializedSize(SyncComplete message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += NodePair.serializer.serializedSize(message.nodes, version);
+            size += SyncNodePair.serializer.serializedSize(message.nodes, version);
             size += TypeSizes.sizeof(message.success);
 
             size += TypeSizes.sizeof(message.summaries.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 9f37095..8ffca6a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -33,7 +33,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -62,14 +61,12 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.CommonRange;
-import org.apache.cassandra.repair.RepairRunnable;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 031326e..5543fcc 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -113,7 +113,7 @@ public abstract class AbstractReadExecutor
     protected void makeFullDataRequests(ReplicaCollection<?> replicas)
     {
         assert all(replicas, Replica::isFull);
-        makeRequests(command, replicas.filter(Replica::isFull));
+        makeRequests(command, replicas);
     }
 
     protected void makeTransientDataRequests(ReplicaCollection<?> replicas)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org