You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 18:59:23 UTC

[1/3] Redesign repair messages

Updated Branches:
  refs/heads/trunk 764620368 -> eb4fa4a62


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index c930cc3..8905830 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -30,27 +30,23 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
 
 import static org.apache.cassandra.service.ActiveRepairService.*;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 {
@@ -59,7 +55,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
     public String tablename;
     public String cfname;
-    public TreeRequest request;
+    public RepairJobDesc desc;
     public ColumnFamilyStore store;
     public InetAddress LOCAL, REMOTE;
 
@@ -107,11 +103,9 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
         local_range = StorageService.instance.getPrimaryRangesForEndpoint(tablename, LOCAL).iterator().next();
 
-        // (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost)
-        int gcBefore = store.gcBefore(System.currentTimeMillis());
-        request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, gcBefore, new CFPair(tablename, cfname));
+        desc = new RepairJobDesc(UUID.randomUUID(), tablename, cfname, local_range);
         // Set a fake session corresponding to this fake request
-        ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname);
+        ActiveRepairService.instance.submitArtificialRepairSession(desc);
     }
 
     @After
@@ -121,51 +115,6 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
     }
 
     @Test
-    public void testValidatorPrepare() throws Throwable
-    {
-        Validator validator;
-
-        // write
-        Util.writeColumnFamily(getWriteData());
-
-        // sample
-        validator = new Validator(request);
-        validator.prepare(store);
-
-        // and confirm that the tree was split
-        assertTrue(validator.tree.size() > 1);
-    }
-
-    @Test
-    public void testValidatorComplete() throws Throwable
-    {
-        Validator validator = new Validator(request);
-        validator.prepare(store);
-        validator.completeTree();
-
-        // confirm that the tree was validated
-        Token min = validator.tree.partitioner().getMinimumToken();
-        assert validator.tree.hash(new Range<Token>(min, min)) != null;
-    }
-
-    @Test
-    public void testValidatorAdd() throws Throwable
-    {
-        Validator validator = new Validator(request);
-        IPartitioner part = validator.tree.partitioner();
-        Token mid = part.midpoint(local_range.left, local_range.right);
-        validator.prepare(store);
-
-        // add a row
-        validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
-                                          TreeMapBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(tablename, cfname))));
-        validator.completeTree();
-
-        // confirm that the tree was validated
-        assert validator.tree.hash(local_range) != null;
-    }
-
-    @Test
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
@@ -253,44 +202,6 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         assertEquals(expected, neighbors);
     }
 
-    @Test
-    public void testDifferencer() throws Throwable
-    {
-        // this next part does some housekeeping so that cleanup in the differencer doesn't error out.
-        ActiveRepairService.RepairFuture sess = ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname);
-
-        // generate a tree
-        Validator validator = new Validator(request);
-        validator.prepare(store);
-        validator.completeTree();
-        MerkleTree ltree = validator.tree;
-
-        // and a clone
-        validator = new Validator(request);
-        validator.prepare(store);
-        validator.completeTree();
-        MerkleTree rtree = validator.tree;
-
-        // change a range in one of the trees
-        Token ltoken = StorageService.getPartitioner().midpoint(local_range.left, local_range.right);
-        ltree.invalidate(ltoken);
-        MerkleTree.TreeRange changed = ltree.get(ltoken);
-        changed.hash("non-empty hash!".getBytes());
-
-        Set<Range> interesting = new HashSet<Range>();
-        interesting.add(changed);
-
-        // difference the trees
-        // note: we reuse the same endpoint which is bogus in theory but fine here
-        ActiveRepairService.TreeResponse r1 = new ActiveRepairService.TreeResponse(REMOTE, ltree);
-        ActiveRepairService.TreeResponse r2 = new ActiveRepairService.TreeResponse(REMOTE, rtree);
-        ActiveRepairService.RepairSession.Differencer diff = sess.session.new Differencer(cfname, r1, r2);
-        diff.run();
-
-        // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting, new HashSet<Range>(diff.differences));
-    }
-
     Set<InetAddress> addTokens(int max) throws Throwable
     {
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 02c2ff7..b47f4d8 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -21,6 +21,9 @@ package org.apache.cassandra.service;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
 
 import org.junit.Test;
 
@@ -30,6 +33,10 @@ import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
@@ -40,72 +47,183 @@ public class SerializationsTest extends AbstractSerializationsTester
         System.setProperty("cassandra.partitioner", "RandomPartitioner");
     }
 
-    public static Range<Token> FULL_RANGE = new Range<Token>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
+    private static final UUID RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
+    private static final Range<Token> FULL_RANGE = new Range<>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
+    private static final RepairJobDesc DESC = new RepairJobDesc(RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
 
-    private void testTreeRequestWrite() throws IOException
+    private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
     {
-        DataOutputStream out = getOutput("service.TreeRequest.bin");
-        ActiveRepairService.TreeRequest.serializer.serialize(Statics.req, out, getVersion());
-        Statics.req.createMessage().serialize(out, getVersion());
-        out.close();
+        try (DataOutputStream out = getOutput(fileName))
+        {
+            for (RepairMessage message : messages)
+            {
+                testSerializedSize(message, RepairMessage.serializer);
+                RepairMessage.serializer.serialize(message, out, getVersion());
+            }
+            // also serialize MessageOut
+            for (RepairMessage message : messages)
+                message.createMessage().serialize(out,  getVersion());
+        }
+    }
 
-        // test serializedSize
-        testSerializedSize(Statics.req, ActiveRepairService.TreeRequest.serializer);
+    private void testValidationRequestWrite() throws IOException
+    {
+        ValidationRequest message = new ValidationRequest(DESC, 1234);
+        testRepairMessageWrite("service.ValidationRequest.bin", message);
     }
 
     @Test
-    public void testTreeRequestRead() throws IOException
+    public void testValidationRequestRead() throws IOException
     {
         if (EXECUTE_WRITES)
-            testTreeRequestWrite();
+            testValidationRequestWrite();
+
+        try (DataInputStream in = getInput("service.ValidationRequest.bin"))
+        {
+            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST;
+            assert DESC.equals(message.desc);
+            assert ((ValidationRequest) message).gcBefore == 1234;
 
-        DataInputStream in = getInput("service.TreeRequest.bin");
-        assert ActiveRepairService.TreeRequest.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        in.close();
+            assert MessageIn.read(in, getVersion(), -1) != null;
+        }
     }
 
-    private void testTreeResponseWrite() throws IOException
+    private void testValidationCompleteWrite() throws IOException
     {
         // empty validation
-        ActiveRepairService.Validator v0 = new ActiveRepairService.Validator(Statics.req);
+        Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(),  -1);
+        ValidationComplete c0 = new ValidationComplete(DESC, v0.tree);
 
         // validation with a tree
         IPartitioner p = new RandomPartitioner();
         MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
         for (int i = 0; i < 10; i++)
             mt.split(p.getRandomToken());
-        ActiveRepairService.Validator v1 = new ActiveRepairService.Validator(Statics.req, mt);
-
-        DataOutputStream out = getOutput("service.TreeResponse.bin");
-        ActiveRepairService.Validator.serializer.serialize(v0, out, getVersion());
-        ActiveRepairService.Validator.serializer.serialize(v1, out, getVersion());
-        v0.createMessage().serialize(out, getVersion());
-        v1.createMessage().serialize(out, getVersion());
-        out.close();
-
-        // test serializedSize
-        testSerializedSize(v0, ActiveRepairService.Validator.serializer);
-        testSerializedSize(v1, ActiveRepairService.Validator.serializer);
+        Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), mt, -1);
+        ValidationComplete c1 = new ValidationComplete(DESC, v1.tree);
+
+        // validation failed
+        ValidationComplete c3 = new ValidationComplete(DESC);
+
+        testRepairMessageWrite("service.ValidationComplete.bin", c0, c1, c3);
     }
 
     @Test
-    public void testTreeResponseRead() throws IOException
+    public void testValidationCompleteRead() throws IOException
     {
         if (EXECUTE_WRITES)
-            testTreeResponseWrite();
-
-        DataInputStream in = getInput("service.TreeResponse.bin");
-        assert ActiveRepairService.Validator.serializer.deserialize(in, getVersion()) != null;
-        assert ActiveRepairService.Validator.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        assert MessageIn.read(in, getVersion(), -1) != null;
-        in.close();
+            testValidationCompleteWrite();
+
+        try (DataInputStream in = getInput("service.ValidationComplete.bin"))
+        {
+            // empty validation
+            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            assert DESC.equals(message.desc);
+
+            assert ((ValidationComplete) message).success;
+            assert ((ValidationComplete) message).tree != null;
+
+            // validation with a tree
+            message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            assert DESC.equals(message.desc);
+
+            assert ((ValidationComplete) message).success;
+            assert ((ValidationComplete) message).tree != null;
+
+            // failed validation
+            message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            assert DESC.equals(message.desc);
+
+            assert !((ValidationComplete) message).success;
+            assert ((ValidationComplete) message).tree == null;
+
+            // MessageOuts
+            for (int i = 0; i < 3; i++)
+                assert MessageIn.read(in, getVersion(), -1) != null;
+        }
     }
 
-    private static class Statics
+    private void testSyncRequestWrite() throws IOException
+    {
+        InetAddress local = InetAddress.getByAddress(new byte[]{127, 0, 0, 1});
+        InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
+        InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
+        SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE));
+
+        testRepairMessageWrite("service.SyncRequest.bin", message);
+    }
+
+    @Test
+    public void testSyncRequestRead() throws IOException
     {
-        private static final ActiveRepairService.CFPair pair = new ActiveRepairService.CFPair("Keyspace1", "Standard1");
-        private static final ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("sessionId", FBUtilities.getBroadcastAddress(), FULL_RANGE, 1234, pair);
+        if (EXECUTE_WRITES)
+            testSyncRequestWrite();
+
+        InetAddress local = InetAddress.getByAddress(new byte[]{127, 0, 0, 1});
+        InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
+        InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
+
+        try (DataInputStream in = getInput("service.SyncRequest.bin"))
+        {
+            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.SYNC_REQUEST;
+            assert DESC.equals(message.desc);
+            assert local.equals(((SyncRequest) message).initiator);
+            assert src.equals(((SyncRequest) message).src);
+            assert dest.equals(((SyncRequest) message).dst);
+            assert ((SyncRequest) message).ranges.size() == 1 && ((SyncRequest) message).ranges.contains(FULL_RANGE);
+
+            assert MessageIn.read(in, getVersion(), -1) != null;
+        }
+    }
+
+    private void testSyncCompleteWrite() throws IOException
+    {
+        InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
+        InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
+        // sync success
+        SyncComplete success = new SyncComplete(DESC, src, dest, true);
+        // sync fail
+        SyncComplete fail = new SyncComplete(DESC, src, dest, false);
+
+        testRepairMessageWrite("service.SyncComplete.bin", success, fail);
+    }
+
+    @Test
+    public void testSyncCompleteRead() throws IOException
+    {
+        if (EXECUTE_WRITES)
+            testSyncCompleteWrite();
+
+        InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2});
+        InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3});
+        NodePair nodes = new NodePair(src, dest);
+
+        try (DataInputStream in = getInput("service.SyncComplete.bin"))
+        {
+            // success
+            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+            assert DESC.equals(message.desc);
+
+            assert nodes.equals(((SyncComplete) message).nodes);
+            assert ((SyncComplete) message).success;
+
+            // fail
+            message = RepairMessage.serializer.deserialize(in, getVersion());
+            assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+            assert DESC.equals(message.desc);
+
+            assert nodes.equals(((SyncComplete) message).nodes);
+            assert !((SyncComplete) message).success;
+
+            // MessageOuts
+            for (int i = 0; i < 2; i++)
+                assert MessageIn.read(in, getVersion(), -1) != null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index d117426..68a2d76 100644
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@ -18,38 +18,28 @@
 */
 package org.apache.cassandra.utils;
 
-import static org.apache.cassandra.utils.MerkleTree.RECOMMENDED_DEPTH;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.math.BigInteger;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.MerkleTree.Hashable;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTree.TreeRange;
 import org.apache.cassandra.utils.MerkleTree.TreeRangeIterator;
-import org.junit.Before;
-import org.junit.Test;
 
-import com.google.common.collect.AbstractIterator;
+import static org.apache.cassandra.utils.MerkleTree.RECOMMENDED_DEPTH;
+import static org.junit.Assert.*;
 
 public class MerkleTreeTest
 {
@@ -64,9 +54,9 @@ public class MerkleTreeTest
     protected IPartitioner partitioner;
     protected MerkleTree mt;
 
-    private Range fullRange()
+    private Range<Token> fullRange()
     {
-        return new Range(partitioner.getMinimumToken(), partitioner.getMinimumToken());
+        return new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken());
     }
 
     @Before
@@ -74,6 +64,8 @@ public class MerkleTreeTest
     {
         TOKEN_SCALE = new BigInteger("8");
         partitioner = new RandomPartitioner();
+        // TODO need to trickle TokenSerializer
+        DatabaseDescriptor.setPartitioner(partitioner);
         mt = new MerkleTree(partitioner, fullRange(), RECOMMENDED_DEPTH, Integer.MAX_VALUE);
     }
 
@@ -94,7 +86,7 @@ public class MerkleTreeTest
      * to 8 means that passing -1 through 8 for this method will return values mapped
      * between -1 and Token.MAX_VALUE.
      */
-    public static BigIntegerToken tok(int i)
+    public static Token tok(int i)
     {
         if (i == -1)
             return new BigIntegerToken(new BigInteger("-1"));
@@ -112,11 +104,11 @@ public class MerkleTreeTest
         mt.split(tok(7));
 
         assertEquals(4, mt.size());
-        assertEquals(new Range(tok(7), tok(-1)), mt.get(tok(-1)));
-        assertEquals(new Range(tok(-1), tok(4)), mt.get(tok(3)));
-        assertEquals(new Range(tok(-1), tok(4)), mt.get(tok(4)));
-        assertEquals(new Range(tok(4), tok(6)), mt.get(tok(6)));
-        assertEquals(new Range(tok(6), tok(7)), mt.get(tok(7)));
+        assertEquals(new Range<>(tok(7), tok(-1)), mt.get(tok(-1)));
+        assertEquals(new Range<>(tok(-1), tok(4)), mt.get(tok(3)));
+        assertEquals(new Range<>(tok(-1), tok(4)), mt.get(tok(4)));
+        assertEquals(new Range<>(tok(4), tok(6)), mt.get(tok(6)));
+        assertEquals(new Range<>(tok(6), tok(7)), mt.get(tok(7)));
 
         // check depths
         assertEquals((byte)1, mt.get(tok(4)).depth);
@@ -147,9 +139,9 @@ public class MerkleTreeTest
         // should fail to split below hashdepth
         assertFalse(mt.split(tok(1)));
         assertEquals(3, mt.size());
-        assertEquals(new Range(tok(4), tok(-1)), mt.get(tok(-1)));
-        assertEquals(new Range(tok(-1), tok(2)), mt.get(tok(2)));
-        assertEquals(new Range(tok(2), tok(4)), mt.get(tok(4)));
+        assertEquals(new Range<>(tok(4), tok(-1)), mt.get(tok(-1)));
+        assertEquals(new Range<>(tok(-1), tok(2)), mt.get(tok(2)));
+        assertEquals(new Range<>(tok(2), tok(4)), mt.get(tok(4)));
     }
 
     @Test
@@ -163,8 +155,8 @@ public class MerkleTreeTest
         // should fail to split above maxsize
         assertFalse(mt.split(tok(2)));
         assertEquals(2, mt.size());
-        assertEquals(new Range(tok(4), tok(-1)), mt.get(tok(-1)));
-        assertEquals(new Range(tok(-1), tok(4)), mt.get(tok(4)));
+        assertEquals(new Range<>(tok(4), tok(-1)), mt.get(tok(-1)));
+        assertEquals(new Range<>(tok(-1), tok(4)), mt.get(tok(4)));
     }
 
     @Test
@@ -174,7 +166,7 @@ public class MerkleTreeTest
 
         // (zero, zero]
         ranges = mt.invalids();
-        assertEquals(new Range(tok(-1), tok(-1)), ranges.next());
+        assertEquals(new Range<>(tok(-1), tok(-1)), ranges.next());
         assertFalse(ranges.hasNext());
 
         // all invalid
@@ -184,13 +176,13 @@ public class MerkleTreeTest
         mt.split(tok(3));
         mt.split(tok(5));
         ranges = mt.invalids();
-        assertEquals(new Range(tok(6), tok(-1)), ranges.next());
-        assertEquals(new Range(tok(-1), tok(2)), ranges.next());
-        assertEquals(new Range(tok(2), tok(3)), ranges.next());
-        assertEquals(new Range(tok(3), tok(4)), ranges.next());
-        assertEquals(new Range(tok(4), tok(5)), ranges.next());
-        assertEquals(new Range(tok(5), tok(6)), ranges.next());
-        assertEquals(new Range(tok(6), tok(-1)), ranges.next());
+        assertEquals(new Range<>(tok(6), tok(-1)), ranges.next());
+        assertEquals(new Range<>(tok(-1), tok(2)), ranges.next());
+        assertEquals(new Range<>(tok(2), tok(3)), ranges.next());
+        assertEquals(new Range<>(tok(3), tok(4)), ranges.next());
+        assertEquals(new Range<>(tok(4), tok(5)), ranges.next());
+        assertEquals(new Range<>(tok(5), tok(6)), ranges.next());
+        assertEquals(new Range<>(tok(6), tok(-1)), ranges.next());
         assertFalse(ranges.hasNext());
     }
 
@@ -199,7 +191,7 @@ public class MerkleTreeTest
     public void testHashFull()
     {
         byte[] val = DUMMY;
-        Range range = new Range(tok(-1), tok(-1));
+        Range<Token> range = new Range<>(tok(-1), tok(-1));
 
         // (zero, zero]
         assertNull(mt.hash(range));
@@ -216,11 +208,11 @@ public class MerkleTreeTest
         byte[] val = DUMMY;
         byte[] leftval = hashed(val, 1, 1);
         byte[] partialval = hashed(val, 1);
-        Range left = new Range(tok(-1), tok(4));
-        Range partial = new Range(tok(2), tok(4));
-        Range right = new Range(tok(4), tok(-1));
-        Range linvalid = new Range(tok(1), tok(4));
-        Range rinvalid = new Range(tok(4), tok(6));
+        Range<Token> left = new Range<>(tok(-1), tok(4));
+        Range<Token> partial = new Range<>(tok(2), tok(4));
+        Range<Token> right = new Range<>(tok(4), tok(-1));
+        Range<Token> linvalid = new Range<>(tok(1), tok(4));
+        Range<Token> rinvalid = new Range<>(tok(4), tok(6));
 
         // (zero,two] (two,four] (four, zero]
         mt.split(tok(4));
@@ -250,10 +242,10 @@ public class MerkleTreeTest
         byte[] lchildval = hashed(val, 3, 3, 2);
         byte[] rchildval = hashed(val, 2, 2);
         byte[] fullval = hashed(val, 3, 3, 2, 2, 2);
-        Range full = new Range(tok(-1), tok(-1));
-        Range lchild = new Range(tok(-1), tok(4));
-        Range rchild = new Range(tok(4), tok(-1));
-        Range invalid = new Range(tok(1), tok(-1));
+        Range<Token> full = new Range<>(tok(-1), tok(-1));
+        Range<Token> lchild = new Range<>(tok(-1), tok(4));
+        Range<Token> rchild = new Range<>(tok(4), tok(-1));
+        Range<Token> invalid = new Range<>(tok(1), tok(-1));
 
         // (zero,one] (one, two] (two,four] (four, six] (six, zero]
         mt.split(tok(4));
@@ -286,9 +278,9 @@ public class MerkleTreeTest
         byte[] val = DUMMY;
         byte[] childfullval = hashed(val, 5, 5, 4);
         byte[] fullval = hashed(val, 5, 5, 4, 3, 2, 1);
-        Range childfull = new Range(tok(-1), tok(4));
-        Range full = new Range(tok(-1), tok(-1));
-        Range invalid = new Range(tok(4), tok(-1));
+        Range<Token> childfull = new Range<>(tok(-1), tok(4));
+        Range<Token> full = new Range<>(tok(-1), tok(-1));
+        Range<Token> invalid = new Range<>(tok(4), tok(-1));
 
         mt = new MerkleTree(partitioner, fullRange(), RECOMMENDED_DEPTH, Integer.MAX_VALUE);
         mt.split(tok(16));
@@ -332,7 +324,7 @@ public class MerkleTreeTest
         for (TreeRange range : ranges)
             range.addHash(new RowHash(range.right, new byte[0]));
 
-        assert mt.hash(new Range(tok(-1), tok(-1))) != null :
+        assert mt.hash(new Range<>(tok(-1), tok(-1))) != null :
             "Could not hash tree " + mt;
     }
 
@@ -347,7 +339,7 @@ public class MerkleTreeTest
     {
         TOKEN_SCALE = new BigInteger("16"); // this test needs slightly more resolution
 
-        Range full = new Range(tok(-1), tok(-1));
+        Range<Token> full = new Range<>(tok(-1), tok(-1));
         Iterator<TreeRange> ranges;
         MerkleTree mt2 = new MerkleTree(partitioner, fullRange(), RECOMMENDED_DEPTH, Integer.MAX_VALUE);
 
@@ -392,9 +384,7 @@ public class MerkleTreeTest
     @Test
     public void testSerialization() throws Exception
     {
-        Range full = new Range(tok(-1), tok(-1));
-        ByteArrayOutputStream bout = new ByteArrayOutputStream();
-        ObjectOutputStream oout = new ObjectOutputStream(bout);
+        Range<Token> full = new Range<>(tok(-1), tok(-1));
 
         // populate and validate the tree
         mt.maxsize(256);
@@ -403,16 +393,13 @@ public class MerkleTreeTest
             range.addAll(new HIterator(range.right));
 
         byte[] initialhash = mt.hash(full);
-        oout.writeObject(mt);
-        oout.close();
 
-        ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
-        ObjectInputStream oin = new ObjectInputStream(bin);
-        MerkleTree restored = (MerkleTree)oin.readObject();
+        ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        MerkleTree.serializer.serialize(mt, out, MessagingService.current_version);
+        byte[] serialized = out.toByteArray();
 
-        // restore partitioner after serialization
-        restored.partitioner(partitioner);
-        restored.fullRange = fullRange();
+        ByteArrayDataInput in = ByteStreams.newDataInput(serialized);
+        MerkleTree restored = MerkleTree.serializer.deserialize(in, MessagingService.current_version);
 
         assertHashEquals(initialhash, restored.hash(full));
     }
@@ -420,7 +407,6 @@ public class MerkleTreeTest
     @Test
     public void testDifference()
     {
-        Range full = new Range(tok(-1), tok(-1));
         int maxsize = 16;
         mt.maxsize(maxsize);
         MerkleTree mt2 = new MerkleTree(partitioner, fullRange(), RECOMMENDED_DEPTH, maxsize);
@@ -451,7 +437,7 @@ public class MerkleTreeTest
         // trees should disagree for (leftmost.left, middle.right]
         List<TreeRange> diffs = MerkleTree.difference(mt, mt2);
         assertEquals(diffs + " contains wrong number of differences:", 1, diffs.size());
-        assertTrue(diffs.contains(new Range(leftmost.left, middle.right)));
+        assertTrue(diffs.contains(new Range<>(leftmost.left, middle.right)));
     }
 
     /**


[3/3] git commit: Redesign repair messages

Posted by sl...@apache.org.
Redesign repair messages

patch by yukim; reviewed by slebresne for CASSANDRA-5426


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb4fa4a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb4fa4a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb4fa4a6

Branch: refs/heads/trunk
Commit: eb4fa4a621db43ad9d48b146ee16caf09db7a853
Parents: 7646203
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 29 15:46:50 2013 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 25 18:48:59 2013 +0200

----------------------------------------------------------------------
 .../db/compaction/CompactionManager.java        |   34 +-
 .../cassandra/exceptions/RepairException.java   |   46 +
 .../apache/cassandra/net/MessagingService.java  |   17 +-
 .../apache/cassandra/repair/Differencer.java    |  138 +++
 .../org/apache/cassandra/repair/NodePair.java   |   85 ++
 .../apache/cassandra/repair/RepairFuture.java   |   31 +
 .../org/apache/cassandra/repair/RepairJob.java  |  225 ++++
 .../apache/cassandra/repair/RepairJobDesc.java  |  121 +++
 .../repair/RepairMessageVerbHandler.java        |   63 ++
 .../apache/cassandra/repair/RepairSession.java  |  320 ++++++
 .../cassandra/repair/RequestCoordinator.java    |  128 +++
 .../cassandra/repair/StreamingRepairTask.java   |  117 ++
 .../apache/cassandra/repair/TreeResponse.java   |   37 +
 .../org/apache/cassandra/repair/Validator.java  |  215 ++++
 .../repair/messages/RepairMessage.java          |  103 ++
 .../cassandra/repair/messages/SyncComplete.java |   80 ++
 .../cassandra/repair/messages/SyncRequest.java  |   97 ++
 .../repair/messages/ValidationComplete.java     |   90 ++
 .../repair/messages/ValidationRequest.java      |   82 ++
 .../cassandra/service/ActiveRepairService.java  | 1016 +-----------------
 .../cassandra/service/StorageService.java       |   25 +-
 .../streaming/StreamingRepairTask.java          |  254 -----
 .../org/apache/cassandra/utils/FBUtilities.java |    5 +
 .../org/apache/cassandra/utils/MerkleTree.java  |   61 +-
 .../serialization/2.0/service.SyncComplete.bin  |  Bin 0 -> 290 bytes
 .../serialization/2.0/service.SyncRequest.bin   |  Bin 0 -> 189 bytes
 .../serialization/2.0/service.TreeRequest.bin   |  Bin 129 -> 0 bytes
 .../serialization/2.0/service.TreeResponse.bin  |  Bin 946 -> 0 bytes
 .../2.0/service.ValidationComplete.bin          |  Bin 0 -> 1063 bytes
 .../2.0/service.ValidationRequest.bin           |  Bin 0 -> 131 bytes
 .../LeveledCompactionStrategyTest.java          |   10 +-
 .../cassandra/repair/DifferencerTest.java       |  139 +++
 .../apache/cassandra/repair/ValidatorTest.java  |  169 +++
 .../service/AntiEntropyServiceTestAbstract.java |  103 +-
 .../cassandra/service/SerializationsTest.java   |  198 +++-
 .../apache/cassandra/utils/MerkleTreeTest.java  |  136 ++-
 36 files changed, 2647 insertions(+), 1498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6c9f50d..06dd95d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -47,8 +47,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
@@ -384,13 +384,22 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Does not mutate data, so is not scheduled.
      */
-    public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final ActiveRepairService.Validator validator)
+    public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                doValidationCompaction(cfStore, validator);
+                try
+                {
+                    doValidationCompaction(cfStore, validator);
+                }
+                catch (Exception e)
+                {
+                    // we need to inform the remote end of our failure, otherwise it will hang on repair forever
+                    validator.fail();
+                    throw e;
+                }
                 return this;
             }
         };
@@ -607,7 +616,7 @@ public class CompactionManager implements CompactionManagerMBean
      * Performs a readonly "compaction" of all sstables in order to validate complete rows,
      * but without writing the merge result
      */
-    private void doValidationCompaction(ColumnFamilyStore cfs, ActiveRepairService.Validator validator) throws IOException
+    private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException
     {
         // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
         // mid-validation, or to attempt to validate a droped CFS.  this is just a best effort to avoid useless work,
@@ -618,17 +627,18 @@ public class CompactionManager implements CompactionManagerMBean
             return;
 
         Collection<SSTableReader> sstables;
+        String snapshotName = validator.desc.sessionId.toString();
         int gcBefore;
-        if (cfs.snapshotExists(validator.request.sessionid))
+        if (cfs.snapshotExists(snapshotName))
         {
             // If there is a snapshot created for the session then read from there.
-            sstables = cfs.getSnapshotSSTableReader(validator.request.sessionid);
+            sstables = cfs.getSnapshotSSTableReader(snapshotName);
 
             // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
             // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
             // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
             // 'as good as in the non-snapshot' case)
-            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
+            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
         }
         else
         {
@@ -638,13 +648,13 @@ public class CompactionManager implements CompactionManagerMBean
             // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
             // instead so they won't be cleaned up if they do get compacted during the validation
             sstables = cfs.markCurrentSSTablesReferenced();
-            if (validator.request.gcBefore > 0)
-                gcBefore = validator.request.gcBefore;
+            if (validator.gcBefore > 0)
+                gcBefore = validator.gcBefore;
             else
                 gcBefore = getDefaultGcBefore(cfs);
         }
 
-        CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range, gcBefore);
+        CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         metrics.beginCompaction(ci);
         try
@@ -664,8 +674,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             SSTableReader.releaseReferences(sstables);
             iter.close();
-            if (cfs.table.snapshotExists(validator.request.sessionid))
-                cfs.table.clearSnapshot(validator.request.sessionid);
+            if (cfs.table.snapshotExists(snapshotName))
+                cfs.table.clearSnapshot(snapshotName);
 
             metrics.finishCompaction(ci);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/exceptions/RepairException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RepairException.java b/src/java/org/apache/cassandra/exceptions/RepairException.java
new file mode 100644
index 0000000..832a6d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/RepairException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * Exception thrown during repair
+ */
+public class RepairException extends Exception
+{
+    public final RepairJobDesc desc;
+
+    public RepairException(RepairJobDesc desc, String message)
+    {
+        super(message);
+        this.desc = desc;
+    }
+
+    public RepairException(RepairJobDesc desc, String message, Throwable cause)
+    {
+        super(message, cause);
+        this.desc = desc;
+    }
+
+    @Override
+    public String getMessage()
+    {
+        return desc.toString() + " " + super.getMessage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4ac408e..faed07a 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
 import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.paxos.Commit;
@@ -91,8 +92,8 @@ public final class MessagingService implements MessagingServiceMBean
         @Deprecated STREAM_REQUEST,
         RANGE_SLICE,
         @Deprecated BOOTSTRAP_TOKEN,
-        TREE_REQUEST,
-        TREE_RESPONSE,
+        @Deprecated TREE_REQUEST,
+        @Deprecated TREE_RESPONSE,
         @Deprecated JOIN,
         GOSSIP_DIGEST_SYN,
         GOSSIP_DIGEST_ACK,
@@ -105,13 +106,14 @@ public final class MessagingService implements MessagingServiceMBean
         REPLICATION_FINISHED,
         INTERNAL_RESPONSE, // responses to internal calls
         COUNTER_MUTATION,
-        STREAMING_REPAIR_REQUEST,
-        STREAMING_REPAIR_RESPONSE,
+        @Deprecated STREAMING_REPAIR_REQUEST,
+        @Deprecated STREAMING_REPAIR_RESPONSE,
         SNAPSHOT, // Similar to nt snapshot
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
         _TRACE, // dummy verb so we can use MS.droppedMessages
         ECHO,
+        REPAIR_MESSAGE,
         // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
         PAXOS_PREPARE,
         PAXOS_PROPOSE,
@@ -151,7 +153,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
         put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
         put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
-
+        put(Verb.REPAIR_MESSAGE, Stage.ANTI_ENTROPY);
         put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -192,10 +194,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
         put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
-        put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
-        put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
-        put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
-        put(Verb.STREAMING_REPAIR_RESPONSE, UUIDSerializer.serializer);
+        put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
         put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer);
         put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/Differencer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Differencer.java b/src/java/org/apache/cassandra/repair/Differencer.java
new file mode 100644
index 0000000..82b331b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/Differencer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
+ */
+public class Differencer implements Runnable
+{
+    private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+    private final RepairJobDesc desc;
+    public final TreeResponse r1;
+    public final TreeResponse r2;
+    public final List<Range<Token>> differences = new ArrayList<>();
+
+    public Differencer(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    {
+        this.desc = desc;
+        this.r1 = r1;
+        this.r2 = r2;
+    }
+
+    /**
+     * Compares our trees, and triggers repairs for any ranges that mismatch.
+     */
+    public void run()
+    {
+        // compare trees, and collect differences
+        differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+
+        // choose a repair method based on the significance of the difference
+        String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        if (differences.isEmpty())
+        {
+            logger.info(String.format(format, "are consistent"));
+            // send back sync complete message
+            MessagingService.instance().sendOneWay(new SyncComplete(desc, r1.endpoint, r2.endpoint, true).createMessage(), FBUtilities.getLocalAddress());
+            return;
+        }
+
+        // non-0 difference: perform streaming repair
+        logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
+        performStreamingRepair();
+    }
+
+    /**
+     * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+     * that will be called out of band once the streams complete.
+     */
+    void performStreamingRepair()
+    {
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
+        InetAddress src = r2.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
+        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
+
+        SyncRequest request = new SyncRequest(desc, local, src, dst, differences);
+        StreamingRepairTask task = new StreamingRepairTask(desc, request);
+        task.run();
+    }
+
+
+    /**
+     * In order to remove completed Differencer, equality is computed only from {@code desc} and
+     * endpoint part of two TreeResponses.
+     */
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Differencer that = (Differencer) o;
+        if (!desc.equals(that.desc)) return false;
+        return minEndpoint().equals(that.minEndpoint()) && maxEndpoint().equals(that.maxEndpoint());
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(desc, minEndpoint(), maxEndpoint());
+    }
+
+    // For equals and hashcode, we don't want to take the endpoint order into account.
+    // So we just order endpoint deterministically to simplify this
+    private InetAddress minEndpoint()
+    {
+        return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0
+             ? r1.endpoint
+             : r2.endpoint;
+    }
+
+    private InetAddress maxEndpoint()
+    {
+        return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0
+             ? r2.endpoint
+             : r1.endpoint;
+    }
+
+    public String toString()
+    {
+        return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + ":" + desc.columnFamily + "@" + desc.range + ">";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
new file mode 100644
index 0000000..f510dc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/NodePair.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+
+/**
+ * NodePair is used for repair message body to indicate the pair of nodes.
+ *
+ * @since 2.0
+ */
+public class NodePair
+{
+    public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
+
+    public final InetAddress endpoint1;
+    public final InetAddress endpoint2;
+
+    public NodePair(InetAddress endpoint1, InetAddress endpoint2)
+    {
+        this.endpoint1 = endpoint1;
+        this.endpoint2 = endpoint2;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        NodePair nodePair = (NodePair) o;
+        return endpoint1.equals(nodePair.endpoint1) && endpoint2.equals(nodePair.endpoint2);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(endpoint1, endpoint2);
+    }
+
+    public static class NodePairSerializer implements IVersionedSerializer<NodePair>
+    {
+        public void serialize(NodePair nodePair, DataOutput out, int version) throws IOException
+        {
+            CompactEndpointSerializationHelper.serialize(nodePair.endpoint1, out);
+            CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out);
+        }
+
+        public NodePair deserialize(DataInput in, int version) throws IOException
+        {
+            InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in);
+            return new NodePair(ep1, ep2);
+        }
+
+        public long serializedSize(NodePair nodePair, int version)
+        {
+            return 2 * CompactEndpointSerializationHelper.serializedSize(nodePair.endpoint1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairFuture.java b/src/java/org/apache/cassandra/repair/RepairFuture.java
new file mode 100644
index 0000000..127d873
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairFuture.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.concurrent.FutureTask;
+
+public class RepairFuture extends FutureTask<Void>
+{
+    public final RepairSession session;
+
+    public RepairFuture(RepairSession session)
+    {
+        super(session, null);
+        this.session = session;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
new file mode 100644
index 0000000..be3744d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.SimpleCondition;
+
+/**
+ * RepairJob runs repair on given ColumnFamily.
+ */
+public class RepairJob
+{
+    private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+    public final RepairJobDesc desc;
+    private final boolean isSequential;
+    // first we send tree requests. this tracks the endpoints remaining to hear from
+    private final RequestCoordinator<InetAddress> treeRequests;
+    // tree responses are then tracked here
+    private final List<TreeResponse> trees = new ArrayList<>();
+    // once all responses are received, each tree is compared with each other, and differencer tasks
+    // are submitted. the job is done when all differencers are complete.
+    private final RequestCoordinator<Differencer> differencers;
+    private final Condition requestsSent = new SimpleCondition();
+    private CountDownLatch snapshotLatch = null;
+    private int gcBefore = -1;
+
+    private volatile boolean failed = false;
+
+    /**
+     * Create repair job to run on specific columnfamily
+     */
+    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+    {
+        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+        this.isSequential = isSequential;
+        this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+        {
+            public void send(InetAddress endpoint)
+            {
+                ValidationRequest request = new ValidationRequest(desc, gcBefore);
+                MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
+            }
+        };
+        this.differencers = new RequestCoordinator<Differencer>(isSequential)
+        {
+            public void send(Differencer d)
+            {
+                StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+            }
+        };
+    }
+
+    /**
+     * @return true if this job failed
+     */
+    public boolean isFailed()
+    {
+        return failed;
+    }
+
+    /**
+     * Send merkle tree request to every involved neighbor.
+     */
+    public void sendTreeRequests(Collection<InetAddress> endpoints)
+    {
+        // send requests to all nodes
+        List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
+        allEndpoints.add(FBUtilities.getBroadcastAddress());
+
+        if (isSequential)
+            makeSnapshots(endpoints);
+
+        this.gcBefore = Table.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+
+        for (InetAddress endpoint : allEndpoints)
+            treeRequests.add(endpoint);
+
+        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
+        treeRequests.start();
+        requestsSent.signalAll();
+    }
+
+    public void makeSnapshots(Collection<InetAddress> endpoints)
+    {
+        try
+        {
+            snapshotLatch = new CountDownLatch(endpoints.size());
+            IAsyncCallback callback = new IAsyncCallback()
+            {
+                public boolean isLatencyForSnitch()
+                {
+                    return false;
+                }
+
+                public void response(MessageIn msg)
+                {
+                    RepairJob.this.snapshotLatch.countDown();
+                }
+            };
+            for (InetAddress endpoint : endpoints)
+                MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
+            snapshotLatch.await();
+            snapshotLatch = null;
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Add a new received tree and return the number of remaining tree to
+     * be received for the job to be complete.
+     *
+     * Callers may assume exactly one addTree call will result in zero remaining endpoints.
+     *
+     * @param endpoint address of the endpoint that sent response
+     * @param tree sent Merkle tree or null if validation failed on endpoint
+     * @return the number of responses waiting to receive
+     */
+    public synchronized int addTree(InetAddress endpoint, MerkleTree tree)
+    {
+        // Wait for all request to have been performed (see #3400)
+        try
+        {
+            requestsSent.await();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError("Interrupted while waiting for requests to be sent");
+        }
+
+        if (tree == null)
+            failed = true;
+        else
+            trees.add(new TreeResponse(endpoint, tree));
+        return treeRequests.completed(endpoint);
+    }
+
+    /**
+     * Submit differencers for running.
+     * All tree *must* have been received before this is called.
+     */
+    public void submitDifferencers()
+    {
+        assert !failed;
+
+        // We need to difference all trees one against another
+        for (int i = 0; i < trees.size() - 1; ++i)
+        {
+            TreeResponse r1 = trees.get(i);
+            for (int j = i + 1; j < trees.size(); ++j)
+            {
+                TreeResponse r2 = trees.get(j);
+                Differencer differencer = new Differencer(desc, r1, r2);
+                logger.debug("Queueing comparison {}", differencer);
+                differencers.add(differencer);
+            }
+        }
+        differencers.start();
+        trees.clear(); // allows gc to do its thing
+    }
+
+    /**
+     * @return true if the given node pair was the last remaining
+     */
+    synchronized boolean completedSynchronization(NodePair nodes, boolean success)
+    {
+        if (!success)
+            failed = true;
+        Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
+        return differencers.completed(completed) == 0;
+    }
+
+    /**
+     * terminate this job.
+     */
+    public void terminate()
+    {
+        if (snapshotLatch != null)
+        {
+            while (snapshotLatch.getCount() > 0)
+                snapshotLatch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
new file mode 100644
index 0000000..596540f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * RepairJobDesc is used from various repair processes to distinguish one RepairJob to another.
+ *
+ * @since 2.0
+ */
+public class RepairJobDesc
+{
+    public static final IVersionedSerializer<RepairJobDesc> serializer = new RepairJobDescSerializer();
+
+    /** RepairSession id */
+    public final UUID sessionId;
+    public final String keyspace;
+    public final String columnFamily;
+    /** repairing range  */
+    public final Range<Token> range;
+
+    public RepairJobDesc(UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+    {
+        this.sessionId = sessionId;
+        this.keyspace = keyspace;
+        this.columnFamily = columnFamily;
+        this.range = range;
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("[repair #");
+        sb.append(sessionId);
+        sb.append(" on ");
+        sb.append(keyspace).append("/").append(columnFamily);
+        sb.append(", ").append(range);
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RepairJobDesc that = (RepairJobDesc) o;
+
+        if (!columnFamily.equals(that.columnFamily)) return false;
+        if (!keyspace.equals(that.keyspace)) return false;
+        if (range != null ? !range.equals(that.range) : that.range != null) return false;
+        if (!sessionId.equals(that.sessionId)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(sessionId, keyspace, columnFamily, range);
+    }
+
+    private static class RepairJobDescSerializer implements IVersionedSerializer<RepairJobDesc>
+    {
+        public void serialize(RepairJobDesc desc, DataOutput out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
+            out.writeUTF(desc.keyspace);
+            out.writeUTF(desc.columnFamily);
+            AbstractBounds.serializer.serialize(desc.range, out, version);
+        }
+
+        public RepairJobDesc deserialize(DataInput in, int version) throws IOException
+        {
+            UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+            Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version);
+            return new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+        }
+
+        public long serializedSize(RepairJobDesc desc, int version)
+        {
+            int size = 0;
+            size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
+            size += TypeSizes.NATIVE.sizeof(desc.keyspace);
+            size += TypeSizes.NATIVE.sizeof(desc.columnFamily);
+            size += AbstractBounds.serializer.serializedSize(desc.range, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
new file mode 100644
index 0000000..3057a41
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * Handles all repair related message.
+ *
+ * @since 2.0
+ */
+public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
+{
+    public void doVerb(MessageIn<RepairMessage> message, int id)
+    {
+        // TODO add cancel/interrupt message
+        RepairJobDesc desc = message.payload.desc;
+        switch (message.payload.messageType)
+        {
+            case VALIDATION_REQUEST:
+                ValidationRequest validationRequest = (ValidationRequest) message.payload;
+                // trigger read-only compaction
+                ColumnFamilyStore store = Table.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+                CompactionManager.instance.submitValidation(store, validator);
+                break;
+
+            case SYNC_REQUEST:
+                // forwarded sync request
+                SyncRequest request = (SyncRequest) message.payload;
+                StreamingRepairTask task = new StreamingRepairTask(desc, request);
+                task.run();
+                break;
+
+            default:
+                ActiveRepairService.instance.handleMessage(message.from, message.payload);
+                break;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
new file mode 100644
index 0000000..7101b5a
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Triggers repairs with all neighbors for the given table, cfs and range.
+ */
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+    private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+    /** Repair session ID */
+    private final UUID id;
+    public final String keyspace;
+    private final String[] cfnames;
+    public final boolean isSequential;
+    /** Range to repair */
+    public final Range<Token> range;
+    public final Set<InetAddress> endpoints;
+
+    private volatile Exception exception;
+    private final AtomicBoolean isFailed = new AtomicBoolean(false);
+
+    // First, all RepairJobs are added to this queue,
+    final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+    // and after receiving all validation, the job is moved to
+    // this map, keyed by CF name.
+    final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
+
+    private final SimpleCondition completed = new SimpleCondition();
+    public final Condition differencingDone = new SimpleCondition();
+
+    private volatile boolean terminated = false;
+
+    /**
+     * Create new repair session.
+     *
+     * @param range range to repair
+     * @param keyspace name of keyspace
+     * @param isSequential true if performing repair on snapshots sequentially
+     * @param isLocal true if you want to perform repair only inside the data center
+     * @param cfnames names of columnfamilies
+     */
+    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+    {
+        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames);
+    }
+
+    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames)
+    {
+        this.id = id;
+        this.isSequential = isSequential;
+        this.keyspace = keyspace;
+        this.cfnames = cfnames;
+        assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
+        this.range = range;
+        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal);
+    }
+
+    public UUID getId()
+    {
+        return id;
+    }
+
+    public Range<Token> getRange()
+    {
+        return range;
+    }
+
+    /**
+     * Receive merkle tree response or failed response from {@code endpoint} for current repair job.
+     *
+     * @param desc repair job description
+     * @param endpoint endpoint that sent merkle tree
+     * @param tree calculated merkle tree, or null if validation failed
+     */
+    public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree)
+    {
+        RepairJob job = jobs.peek();
+        if (job == null)
+        {
+            assert terminated;
+            return;
+        }
+
+        if (tree == null)
+        {
+            exception = new RepairException(desc, "Validation failed in " + endpoint);
+            forceShutdown();
+            return;
+        }
+
+        logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", getId(), desc.columnFamily, endpoint));
+
+        assert job.desc.equals(desc);
+        if (job.addTree(endpoint, tree) == 0)
+        {
+            logger.debug("All response received for " + getId() + "/" + desc.columnFamily);
+            if (!job.isFailed())
+            {
+                syncingJobs.put(job.desc.columnFamily, job);
+                job.submitDifferencers();
+            }
+
+            // This job is complete, switching to next in line (note that only
+            // one thread will can ever do this)
+            jobs.poll();
+            RepairJob nextJob = jobs.peek();
+            if (nextJob == null)
+                // We are done with this repair session as far as differencing
+                // is considered. Just inform the session
+                differencingDone.signalAll();
+            else
+                nextJob.sendTreeRequests(endpoints);
+        }
+    }
+
+    /**
+     * Notify this session that sync completed/failed with given {@code NodePair}.
+     *
+     * @param desc synced repair job
+     * @param nodes nodes that completed sync
+     * @param success true if sync succeeded
+     */
+    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+    {
+        RepairJob job = syncingJobs.get(desc.columnFamily);
+        if (job == null)
+        {
+            assert terminated;
+            return;
+        }
+
+        if (!success)
+        {
+            exception = new RepairException(desc, String.format("Sync failed between %s and %s", nodes.endpoint1, nodes.endpoint2));
+            forceShutdown();
+            return;
+        }
+
+        logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily));
+
+        if (job.completedSynchronization(nodes, success))
+        {
+            RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily);
+            String remaining = syncingJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", syncingJobs.size());
+            if (completedJob != null && completedJob.isFailed())
+                logger.warn(String.format("[repair #%s] %s sync failed%s", getId(), desc.columnFamily, remaining));
+            else
+                logger.info(String.format("[repair #%s] %s is fully synced%s", getId(), desc.columnFamily, remaining));
+
+            if (jobs.isEmpty() && syncingJobs.isEmpty())
+            {
+                // this repair session is completed
+                completed.signalAll();
+            }
+        }
+    }
+
+    private String repairedNodes()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(FBUtilities.getBroadcastAddress());
+        for (InetAddress ep : endpoints)
+            sb.append(", ").append(ep);
+        return sb.toString();
+    }
+
+    // we don't care about the return value but care about it throwing exception
+    public void runMayThrow() throws Exception
+    {
+        logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames)));
+
+        if (endpoints.isEmpty())
+        {
+            differencingDone.signalAll();
+            logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range));
+            return;
+        }
+
+        // Checking all nodes are live
+        for (InetAddress endpoint : endpoints)
+        {
+            if (!FailureDetector.instance.isAlive(endpoint))
+            {
+                String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
+                differencingDone.signalAll();
+                logger.error(String.format("[repair #%s] ", getId()) + message);
+                throw new IOException(message);
+            }
+        }
+        ActiveRepairService.instance.addToActiveSessions(this);
+        try
+        {
+            // Create and queue a RepairJob for each column family
+            for (String cfname : cfnames)
+            {
+                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+                jobs.offer(job);
+            }
+
+            jobs.peek().sendTreeRequests(endpoints);
+
+            // block whatever thread started this session until all requests have been returned:
+            // if this thread dies, the session will still complete in the background
+            completed.await();
+            if (exception == null)
+            {
+                logger.info(String.format("[repair #%s] session completed successfully", getId()));
+            }
+            else
+            {
+                logger.error(String.format("[repair #%s] session completed with the following error", getId()), exception);
+                throw exception;
+            }
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Interrupted while waiting for repair.");
+        }
+        finally
+        {
+            // mark this session as terminated
+            terminate();
+            ActiveRepairService.instance.removeFromActiveSessions(this);
+        }
+    }
+
+    public void terminate()
+    {
+        terminated = true;
+        for (RepairJob job : jobs)
+            job.terminate();
+        jobs.clear();
+        syncingJobs.clear();
+    }
+
+    /**
+     * clear all RepairJobs and terminate this session.
+     */
+    public void forceShutdown()
+    {
+        differencingDone.signalAll();
+        completed.signalAll();
+    }
+
+    void failedNode(InetAddress remote)
+    {
+        String errorMsg = String.format("Endpoint %s died", remote);
+        exception = new IOException(errorMsg);
+        // If a node failed, we stop everything (though there could still be some activity in the background)
+        forceShutdown();
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!endpoints.contains(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        // Though unlikely, it is possible to arrive here multiple time and we
+        // want to avoid print an error message twice
+        if (!isFailed.compareAndSet(false, true))
+            return;
+
+        failedNode(endpoint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RequestCoordinator.java b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
new file mode 100644
index 0000000..ed089ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+*/
+public abstract class RequestCoordinator<R>
+{
+    private final Order<R> orderer;
+
+    public RequestCoordinator(boolean isSequential)
+    {
+        this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
+    }
+
+    public abstract void send(R request);
+
+    public void add(R request)
+    {
+        orderer.add(request);
+    }
+
+    public void start()
+    {
+        orderer.start();
+    }
+
+    // Returns how many request remains
+    public int completed(R request)
+    {
+        return orderer.completed(request);
+    }
+
+    private static abstract class Order<R>
+    {
+        protected final RequestCoordinator<R> coordinator;
+
+        Order(RequestCoordinator<R> coordinator)
+        {
+            this.coordinator = coordinator;
+        }
+
+        public abstract void add(R request);
+        public abstract void start();
+        public abstract int completed(R request);
+    }
+
+    private static class SequentialOrder<R> extends Order<R>
+    {
+        private final Queue<R> requests = new LinkedList<>();
+
+        SequentialOrder(RequestCoordinator<R> coordinator)
+        {
+            super(coordinator);
+        }
+
+        public void add(R request)
+        {
+            requests.add(request);
+        }
+
+        public void start()
+        {
+            if (requests.isEmpty())
+                return;
+
+            coordinator.send(requests.peek());
+        }
+
+        public int completed(R request)
+        {
+            assert request.equals(requests.peek());
+            requests.poll();
+            int remaining = requests.size();
+            if (remaining != 0)
+                coordinator.send(requests.peek());
+            return remaining;
+        }
+    }
+
+    private static class ParallelOrder<R> extends Order<R>
+    {
+        private final Set<R> requests = new HashSet<>();
+
+        ParallelOrder(RequestCoordinator<R> coordinator)
+        {
+            super(coordinator);
+        }
+
+        public void add(R request)
+        {
+            requests.add(request);
+        }
+
+        public void start()
+        {
+            for (R request : requests)
+                coordinator.send(request);
+        }
+
+        public int completed(R request)
+        {
+            requests.remove(request);
+            return requests.size();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
new file mode 100644
index 0000000..4670ce6
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Task that make two nodes exchange (stream) some ranges (for a given table/cf).
+ * This handle the case where the local node is neither of the two nodes that
+ * must stream their range, and allow to register a callback to be called on
+ * completion.
+ */
+public class StreamingRepairTask implements Runnable, StreamEventHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
+
+    /** Repair session ID that this streaming task belongs */
+    public final RepairJobDesc desc;
+    public final SyncRequest request;
+
+    // we expect one callback for the receive, and one for the send
+    private final AtomicInteger outstanding = new AtomicInteger(2);
+
+    public StreamingRepairTask(RepairJobDesc desc, SyncRequest request)
+    {
+        this.desc = desc;
+        this.request = request;
+    }
+
+    /**
+     * Returns true if the task if the task can be executed locally, false if
+     * it has to be forwarded.
+     */
+    public boolean isLocalTask()
+    {
+        return request.initiator.equals(request.src);
+    }
+
+    public void run()
+    {
+        if (request.src.equals(FBUtilities.getBroadcastAddress()))
+            initiateStreaming();
+        else
+            forwardToSource();
+    }
+
+    private void initiateStreaming()
+    {
+        logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
+        StreamResultFuture op = new StreamPlan("Repair")
+                                    .flushBeforeTransfer(true)
+                                    // request ranges from the remote node
+                                    .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
+                                    // send ranges to the remote node
+                                    .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
+                                    .execute();
+        op.addEventListener(this);
+    }
+
+    private void forwardToSource()
+    {
+        logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst));
+        MessagingService.instance().sendOneWay(request.createMessage(), request.src);
+    }
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        // Nothing to do here, all we care about is the final success or failure and that's handled by
+        // onSuccess and onFailure
+    }
+
+    /**
+     * If we succeeded on both stream in and out, reply back to the initiator.
+     */
+    public void onSuccess(StreamState state)
+    {
+        logger.info(String.format("[repair #%s] streaming task succeed, returning response to %s", desc.sessionId, request.initiator));
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true).createMessage(), request.initiator);
+    }
+
+    /**
+     * If we failed on either stream in or out, reply fail to the initiator.
+     */
+    public void onFailure(Throwable t)
+    {
+        MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false).createMessage(), request.initiator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
new file mode 100644
index 0000000..eede4ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * Merkle tree response sent from given endpoint.
+ */
+public class TreeResponse
+{
+    public final InetAddress endpoint;
+    public final MerkleTree tree;
+
+    public TreeResponse(InetAddress endpoint, MerkleTree tree)
+    {
+        this.endpoint = endpoint;
+        this.tree = tree;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
new file mode 100644
index 0000000..97b4ca2
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * A Strategy to handle building and validating a merkle tree for a column family.
+ *
+ * Lifecycle:
+ * 1. prepare() - Initialize tree with samples.
+ * 2. add() - 0 or more times, to add hashes to the tree.
+ * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
+ */
+public class Validator implements Runnable
+{
+    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+    public final RepairJobDesc desc;
+    public final InetAddress initiator;
+    public final MerkleTree tree;
+    public final int gcBefore;
+
+    // null when all rows with the min token have been consumed
+    private transient long validated;
+    private transient MerkleTree.TreeRange range;
+    private transient MerkleTree.TreeRangeIterator ranges;
+    private transient DecoratedKey lastKey;
+
+    public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
+
+    /**
+     * Create Validator with default size of initial Merkle Tree.
+     */
+    public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
+    {
+        this(desc,
+             initiator,
+             // TODO: memory usage (maxsize) should either be tunable per
+             // CF, globally, or as shared for all CFs in a cluster
+             new MerkleTree(DatabaseDescriptor.getPartitioner(), desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)),
+             gcBefore);
+    }
+
+    public Validator(RepairJobDesc desc, InetAddress initiator, MerkleTree tree, int gcBefore)
+    {
+        this.desc = desc;
+        this.initiator = initiator;
+        this.tree = tree;
+        this.gcBefore = gcBefore;
+        validated = 0;
+        range = null;
+        ranges = null;
+    }
+
+    public void prepare(ColumnFamilyStore cfs)
+    {
+        if (!tree.partitioner().preservesOrder())
+        {
+            // You can't beat an even tree distribution for md5
+            tree.init();
+        }
+        else
+        {
+            List<DecoratedKey> keys = new ArrayList<>();
+            for (DecoratedKey sample : cfs.keySamples(desc.range))
+            {
+                assert desc.range.contains(sample.token): "Token " + sample.token + " is not within range " + desc.range;
+                keys.add(sample);
+            }
+
+            if (keys.isEmpty())
+            {
+                // use an even tree distribution
+                tree.init();
+            }
+            else
+            {
+                int numkeys = keys.size();
+                Random random = new Random();
+                // sample the column family using random keys from the index
+                while (true)
+                {
+                    DecoratedKey dk = keys.get(random.nextInt(numkeys));
+                    if (!tree.split(dk.token))
+                        break;
+                }
+            }
+        }
+        logger.debug("Prepared AEService tree of size " + tree.size() + " for " + desc);
+        ranges = tree.invalids();
+    }
+
+    /**
+     * Called (in order) for every row present in the CF.
+     * Hashes the row, and adds it to the tree being built.
+     *
+     * @param row Row to add hash
+     */
+    public void add(AbstractCompactedRow row)
+    {
+        assert desc.range.contains(row.key.token) : row.key.token + " is not contained in " + desc.range;
+        assert lastKey == null || lastKey.compareTo(row.key) < 0
+               : "row " + row.key + " received out of order wrt " + lastKey;
+        lastKey = row.key;
+
+        if (range == null)
+            range = ranges.next();
+
+        // generate new ranges as long as case 1 is true
+        while (!range.contains(row.key.token))
+        {
+            // add the empty hash, and move to the next range
+            range.addHash(EMPTY_ROW);
+            range = ranges.next();
+        }
+
+        // case 3 must be true: mix in the hashed row
+        range.addHash(rowHash(row));
+    }
+
+    private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
+    {
+        validated++;
+        // MerkleTree uses XOR internally, so we want lots of output bits here
+        MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
+        row.update(digest);
+        return new MerkleTree.RowHash(row.key.token, digest.digest());
+    }
+
+    /**
+     * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
+     */
+    public void complete()
+    {
+        completeTree();
+
+        StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+        logger.debug("Validated " + validated + " rows into AEService tree for " + desc);
+    }
+
+    @VisibleForTesting
+    public void completeTree()
+    {
+        assert ranges != null : "Validator was not prepared()";
+
+        if (range != null)
+            range.addHash(EMPTY_ROW);
+        while (ranges.hasNext())
+        {
+            range = ranges.next();
+            range.addHash(EMPTY_ROW);
+        }
+    }
+
+    /**
+     * Called when some error during the validation happened.
+     * This sends RepairStatus to inform the initiator that the validation has failed.
+     * The actual reason for failure should be looked up in the log of the host calling this function.
+     */
+    public void fail()
+    {
+        logger.error("Failed creating a merkle tree for " + desc + ", " + initiator + " (see log for details)");
+        // send fail message only to nodes >= version 2.0
+        MessagingService.instance().sendOneWay(new ValidationComplete(desc).createMessage(), initiator);
+    }
+
+    /**
+     * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
+     */
+    public void run()
+    {
+        // respond to the request that triggered this validation
+        if (!initiator.equals(FBUtilities.getBroadcastAddress()))
+            logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s/%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
+        MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
new file mode 100644
index 0000000..a0839f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * Base class of all repair related request/response messages.
+ *
+ * @since 2.0
+ */
+public abstract class RepairMessage
+{
+    public static final IVersionedSerializer<RepairMessage> serializer = new RepairMessageSerializer();
+
+    public static interface MessageSerializer<T extends RepairMessage> extends IVersionedSerializer<T> {}
+
+    public static enum Type
+    {
+        VALIDATION_REQUEST(0, ValidationRequest.serializer),
+        VALIDATION_COMPLETE(1, ValidationComplete.serializer),
+        SYNC_REQUEST(2, SyncRequest.serializer),
+        SYNC_COMPLETE(3, SyncComplete.serializer);
+
+        private final byte type;
+        private final MessageSerializer<RepairMessage> serializer;
+
+        private Type(int type, MessageSerializer<RepairMessage> serializer)
+        {
+            this.type = (byte) type;
+            this.serializer = serializer;
+        }
+
+        public static Type fromByte(byte b)
+        {
+            for (Type t : values())
+            {
+               if (t.type == b)
+                   return t;
+            }
+            throw new IllegalArgumentException("Unknown RepairMessage.Type: " + b);
+        }
+    }
+
+    public final Type messageType;
+    public final RepairJobDesc desc;
+
+    protected RepairMessage(Type messageType, RepairJobDesc desc)
+    {
+        this.messageType = messageType;
+        this.desc = desc;
+    }
+
+    public MessageOut<RepairMessage> createMessage()
+    {
+        return new MessageOut<>(MessagingService.Verb.REPAIR_MESSAGE, this, RepairMessage.serializer);
+    }
+
+    public static class RepairMessageSerializer implements IVersionedSerializer<RepairMessage>
+    {
+        public void serialize(RepairMessage message, DataOutput out, int version) throws IOException
+        {
+            out.write(message.messageType.type);
+            message.messageType.serializer.serialize(message, out, version);
+        }
+
+        public RepairMessage deserialize(DataInput in, int version) throws IOException
+        {
+            RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte());
+            return messageType.serializer.deserialize(in, version);
+        }
+
+        public long serializedSize(RepairMessage message, int version)
+        {
+            long size = 1; // for messageType byte
+            size += message.messageType.serializer.serializedSize(message, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
new file mode 100644
index 0000000..b54492e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ *
+ * @since 2.0
+ */
+public class SyncComplete extends RepairMessage
+{
+    public static final MessageSerializer serializer = new SyncCompleteSerializer();
+
+    /** nodes that involved in this sync */
+    public final NodePair nodes;
+    /** true if sync success, false otherwise */
+    public final boolean success;
+
+    public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+    {
+        super(Type.SYNC_COMPLETE, desc);
+        this.nodes = nodes;
+        this.success = success;
+    }
+
+    public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success)
+    {
+        super(Type.SYNC_COMPLETE, desc);
+        this.nodes = new NodePair(endpoint1, endpoint2);
+        this.success = success;
+    }
+
+    private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
+    {
+        public void serialize(SyncComplete message, DataOutput out, int version) throws IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+            NodePair.serializer.serialize(message.nodes, out, version);
+            out.writeBoolean(message.success);
+        }
+
+        public SyncComplete deserialize(DataInput in, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+            NodePair nodes = NodePair.serializer.deserialize(in, version);
+            return new SyncComplete(desc, nodes, in.readBoolean());
+        }
+
+        public long serializedSize(SyncComplete message, int version)
+        {
+            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+            size += NodePair.serializer.serializedSize(message.nodes, version);
+            size += TypeSizes.NATIVE.sizeof(message.success);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
new file mode 100644
index 0000000..a06254b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Body part of SYNC_REQUEST repair message.
+ * Request {@code src} node to sync data with {@code dst} node for range {@code ranges}.
+ *
+ * @since 2.0
+ */
+public class SyncRequest extends RepairMessage
+{
+    public static MessageSerializer serializer = new SyncRequestSerializer();
+
+    public final InetAddress initiator;
+    public final InetAddress src;
+    public final InetAddress dst;
+    public final Collection<Range<Token>> ranges;
+
+    public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges)
+    {
+        super(Type.SYNC_REQUEST, desc);
+        this.initiator = initiator;
+        this.src = src;
+        this.dst = dst;
+        this.ranges = ranges;
+    }
+
+    public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
+    {
+        public void serialize(SyncRequest message, DataOutput out, int version) throws IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+            CompactEndpointSerializationHelper.serialize(message.initiator, out);
+            CompactEndpointSerializationHelper.serialize(message.src, out);
+            CompactEndpointSerializationHelper.serialize(message.dst, out);
+            out.writeInt(message.ranges.size());
+            for (Range<Token> range : message.ranges)
+                AbstractBounds.serializer.serialize(range, out, version);
+        }
+
+        public SyncRequest deserialize(DataInput in, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+            InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+            int rangesCount = in.readInt();
+            List<Range<Token>> ranges = new ArrayList<>(rangesCount);
+            for (int i = 0; i < rangesCount; ++i)
+                ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
+            return new SyncRequest(desc, owner, src, dst, ranges);
+        }
+
+        public long serializedSize(SyncRequest message, int version)
+        {
+            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+            size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += TypeSizes.NATIVE.sizeof(message.ranges.size());
+            for (Range<Token> range : message.ranges)
+                size += AbstractBounds.serializer.serializedSize(range, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
new file mode 100644
index 0000000..4ddbc2e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * ValidationComplete message is sent when validation compaction completed successfully.
+ *
+ * @since 2.0
+ */
+public class ValidationComplete extends RepairMessage
+{
+    public static MessageSerializer serializer = new ValidationCompleteSerializer();
+
+    /** true if validation success, false otherwise */
+    public final boolean success;
+    /** Merkle hash tree response. Null if validation failed. */
+    public final MerkleTree tree;
+
+    public ValidationComplete(RepairJobDesc desc)
+    {
+        super(Type.VALIDATION_COMPLETE, desc);
+        this.success = false;
+        this.tree = null;
+    }
+
+    public ValidationComplete(RepairJobDesc desc, MerkleTree tree)
+    {
+        super(Type.VALIDATION_COMPLETE, desc);
+        assert tree != null;
+        this.success = true;
+        this.tree = tree;
+    }
+
+    private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
+    {
+        public void serialize(ValidationComplete message, DataOutput out, int version) throws IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+            out.writeBoolean(message.success);
+            if (message.success)
+                MerkleTree.serializer.serialize(message.tree, out, version);
+        }
+
+        public ValidationComplete deserialize(DataInput in, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+            if (in.readBoolean())
+            {
+                MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+                return new ValidationComplete(desc, tree);
+            }
+            else
+            {
+                return new ValidationComplete(desc);
+            }
+        }
+
+        public long serializedSize(ValidationComplete message, int version)
+        {
+            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+            size += TypeSizes.NATIVE.sizeof(message.success);
+            if (message.success)
+                size += MerkleTree.serializer.serializedSize(message.tree, version);
+            return size;
+        }
+    }
+}


[2/3] Redesign repair messages

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
new file mode 100644
index 0000000..3b9f6dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * ValidationRequest
+ *
+ * @since 2.0
+ */
+public class ValidationRequest extends RepairMessage
+{
+    public static MessageSerializer serializer = new ValidationRequestSerializer();
+
+    public final int gcBefore;
+
+    public ValidationRequest(RepairJobDesc desc, int gcBefore)
+    {
+        super(Type.VALIDATION_REQUEST, desc);
+        this.gcBefore = gcBefore;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ValidationRequest that = (ValidationRequest) o;
+        return gcBefore == that.gcBefore;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return gcBefore;
+    }
+
+    public static class ValidationRequestSerializer implements MessageSerializer<ValidationRequest>
+    {
+        public void serialize(ValidationRequest message, DataOutput out, int version) throws IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+            out.writeInt(message.gcBefore);
+        }
+
+        public ValidationRequest deserialize(DataInput dis, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, version);
+            return new ValidationRequest(desc, dis.readInt());
+        }
+
+        public long serializedSize(ValidationRequest message, int version)
+        {
+            long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+            size += TypeSizes.NATIVE.sizeof(message.gcBefore);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 82ecbe9..2bcf579 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,37 +17,25 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.*;
 import java.net.InetAddress;
-import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.Sets;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.*;
-import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.streaming.StreamingRepairTask;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.repair.*;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * ActiveRepairService encapsulates "validating" (hashing) individual column families,
@@ -79,8 +67,6 @@ import org.apache.cassandra.utils.*;
  */
 public class ActiveRepairService
 {
-    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
-
     // singleton enforcement
     public static final ActiveRepairService instance = new ActiveRepairService();
 
@@ -103,14 +89,14 @@ public class ActiveRepairService
     /**
      * A map of active session.
      */
-    private final ConcurrentMap<String, RepairSession> sessions;
+    private final ConcurrentMap<UUID, RepairSession> sessions;
 
     /**
      * Protected constructor. Use ActiveRepairService.instance.
      */
     protected ActiveRepairService()
     {
-        sessions = new ConcurrentHashMap<String, RepairSession>();
+        sessions = new ConcurrentHashMap<>();
     }
 
     /**
@@ -118,16 +104,30 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
     {
-        RepairSession session = new RepairSession(range, tablename, isSequential, isLocal, cfnames);
+        RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames);
         if (session.endpoints.isEmpty())
             return null;
-        RepairFuture futureTask = session.getFuture();
+        RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
         return futureTask;
     }
 
+    public void addToActiveSessions(RepairSession session)
+    {
+        sessions.put(session.getId(), session);
+        Gossiper.instance.register(session);
+        FailureDetector.instance.registerFailureDetectionEventListener(session);
+    }
+
+    public void removeFromActiveSessions(RepairSession session)
+    {
+        FailureDetector.instance.unregisterFailureDetectionEventListener(session);
+        Gossiper.instance.unregister(session);
+        sessions.remove(session.getId());
+    }
+
     public void terminateSessions()
     {
         for (RepairSession session : sessions.values())
@@ -138,9 +138,11 @@ public class ActiveRepairService
 
     // for testing only. Create a session corresponding to a fake request and
     // add it to the sessions (avoid NPE in tests)
-    RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames)
+    RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture();
+        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily});
+        sessions.put(session.getId(), session);
+        RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
         return futureTask;
     }
@@ -154,7 +156,7 @@ public class ActiveRepairService
      *
      * @return neighbors with whom we share the provided range
      */
-    static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
+    public static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
     {
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
@@ -174,7 +176,7 @@ public class ActiveRepairService
         if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
             return Collections.emptySet();
 
-        Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
+        Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
 
         if (isLocal)
@@ -187,949 +189,25 @@ public class ActiveRepairService
         return neighbors;
     }
 
-    /**
-     * Register a tree for the given request to be compared to the appropriate trees in Stage.ANTIENTROPY when they become available.
-     */
-    private void rendezvous(TreeRequest request, MerkleTree tree)
+    public void handleMessage(InetAddress endpoint, RepairMessage message)
     {
-        RepairSession session = sessions.get(request.sessionid);
+        RepairJobDesc desc = message.desc;
+        RepairSession session = sessions.get(desc.sessionId);
         if (session == null)
-        {
-            logger.warn("Got a merkle tree response for unknown repair session {}: either this node has been restarted since the session was started, or the session has been interrupted for an unknown reason. ", request.sessionid);
             return;
-        }
-
-        RepairSession.RepairJob job = session.jobs.peek();
-        if (job == null)
-        {
-            assert session.terminated();
-            return;
-        }
-
-        logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint));
-
-        if (job.addTree(request, tree) == 0)
-        {
-            logger.debug("All trees received for " + session.getName() + "/" + request.cf.right);
-            job.submitDifferencers();
-
-            // This job is complete, switching to next in line (note that only
-            // one thread will can ever do this)
-            session.jobs.poll();
-            RepairSession.RepairJob nextJob = session.jobs.peek();
-            if (nextJob == null)
-                // We are done with this repair session as far as differencing
-                // is considern. Just inform the session
-                session.differencingDone.signalAll();
-            else
-                nextJob.sendTreeRequests();
-        }
-    }
-
-    /**
-     * Responds to the node that requested the given valid tree.
-     * @param validator A locally generated validator
-     * @param local localhost (parameterized for testing)
-     */
-    void respond(Validator validator, InetAddress local)
-    {
-        MessagingService ms = MessagingService.instance();
-
-        try
-        {
-            if (!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
-                logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s", validator.request.sessionid, validator.request.endpoint, validator.request.cf));
-            ms.sendOneWay(validator.createMessage(), validator.request.endpoint);
-        }
-        catch (Exception e)
-        {
-            logger.error(String.format("[repair #%s] Error sending completed merkle tree to %s for %s ", validator.request.sessionid, validator.request.endpoint, validator.request.cf), e);
-        }
-    }
-
-    /**
-     * A Strategy to handle building a merkle tree for a column family.
-     *
-     * Lifecycle:
-     * 1. prepare() - Initialize tree with samples.
-     * 2. add() - 0 or more times, to add hashes to the tree.
-     * 3. complete() - complete building tree and send it back to the initiator
-     */
-    public static class Validator implements Runnable
-    {
-        public final TreeRequest request;
-        public final MerkleTree tree;
-
-        // null when all rows with the min token have been consumed
-        private transient long validated;
-        private transient MerkleTree.TreeRange range;
-        private transient MerkleTree.TreeRangeIterator ranges;
-        private transient DecoratedKey lastKey;
-
-        public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
-        public static ValidatorSerializer serializer = new ValidatorSerializer();
-
-        public Validator(TreeRequest request)
-        {
-            this(request,
-                 // TODO: memory usage (maxsize) should either be tunable per
-                 // CF, globally, or as shared for all CFs in a cluster
-                 new MerkleTree(DatabaseDescriptor.getPartitioner(), request.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
-        }
-
-        Validator(TreeRequest request, MerkleTree tree)
-        {
-            this.request = request;
-            this.tree = tree;
-            // Reestablishing the range because we don't serialize it (for bad
-            // reason - see MerkleTree for details)
-            this.tree.fullRange = this.request.range;
-            validated = 0;
-            range = null;
-            ranges = null;
-        }
-
-        public void prepare(ColumnFamilyStore cfs)
-        {
-            if (!tree.partitioner().preservesOrder())
-            {
-                // You can't beat an even tree distribution for md5
-                tree.init();
-            }
-            else
-            {
-                List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
-                for (DecoratedKey sample : cfs.keySamples(request.range))
-                {
-                    assert request.range.contains(sample.token): "Token " + sample.token + " is not within range " + request.range;
-                    keys.add(sample);
-                }
-
-                if (keys.isEmpty())
-                {
-                    // use an even tree distribution
-                    tree.init();
-                }
-                else
-                {
-                    int numkeys = keys.size();
-                    Random random = new Random();
-                    // sample the column family using random keys from the index
-                    while (true)
-                    {
-                        DecoratedKey dk = keys.get(random.nextInt(numkeys));
-                        if (!tree.split(dk.token))
-                            break;
-                    }
-                }
-            }
-            logger.debug("Prepared AEService tree of size " + tree.size() + " for " + request);
-            ranges = tree.invalids();
-        }
-
-        /**
-         * Called (in order) for rows in given range present in the CF.
-         * Hashes the row, and adds it to the tree being built.
-         *
-         * @param row The row.
-         */
-        public void add(AbstractCompactedRow row)
-        {
-            assert request.range.contains(row.key.token) : row.key.token + " is not contained in " + request.range;
-            assert lastKey == null || lastKey.compareTo(row.key) < 0
-                   : "row " + row.key + " received out of order wrt " + lastKey;
-            lastKey = row.key;
-
-            if (range == null)
-                range = ranges.next();
-
-            // generate new ranges as long as case 1 is true
-            while (!range.contains(row.key.token))
-            {
-                // add the empty hash, and move to the next range
-                range.addHash(EMPTY_ROW);
-                range = ranges.next();
-            }
-
-            // case 3 must be true: mix in the hashed row
-            range.addHash(rowHash(row));
-        }
-
-        private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
-        {
-            validated++;
-            // MerkleTree uses XOR internally, so we want lots of output bits here
-            MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
-            row.update(digest);
-            return new MerkleTree.RowHash(row.key.token, digest.digest());
-        }
-
-        /**
-         * Registers the newly created tree for rendezvous in Stage.ANTI_ENTROPY.
-         */
-        public void complete()
-        {
-            completeTree();
-
-            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
-            logger.debug("Validated " + validated + " rows into AEService tree for " + request);
-        }
-
-        void completeTree()
-        {
-            assert ranges != null : "Validator was not prepared()";
-
-            if (range != null)
-                range.addHash(EMPTY_ROW);
-            while (ranges.hasNext())
-            {
-                range = ranges.next();
-                range.addHash(EMPTY_ROW);
-            }
-        }
-
-        /**
-         * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTI_ENTROPY.
-         */
-        public void run()
-        {
-            // respond to the request that triggered this validation
-            ActiveRepairService.instance.respond(this, FBUtilities.getBroadcastAddress());
-        }
-
-        public MessageOut<Validator> createMessage()
-        {
-            return new MessageOut<Validator>(MessagingService.Verb.TREE_RESPONSE, this, Validator.serializer);
-        }
-
-        public static class ValidatorSerializer implements IVersionedSerializer<Validator>
-        {
-            public void serialize(Validator validator, DataOutput out, int version) throws IOException
-            {
-                TreeRequest.serializer.serialize(validator.request, out, version);
-                MerkleTree.serializer.serialize(validator.tree, out, version);
-            }
-
-            public Validator deserialize(DataInput in, int version) throws IOException
-            {
-                final TreeRequest request = TreeRequest.serializer.deserialize(in, version);
-                try
-                {
-                    return new Validator(request, MerkleTree.serializer.deserialize(in, version));
-                }
-                catch(Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            public long serializedSize(Validator validator, int version)
-            {
-                return TreeRequest.serializer.serializedSize(validator.request, version)
-                       + MerkleTree.serializer.serializedSize(validator.tree, version);
-            }
-        }
-    }
-
-    /**
-     * Handler for requests from remote nodes to generate a valid tree.
-     */
-    public static class TreeRequestVerbHandler implements IVerbHandler<TreeRequest>
-    {
-        /**
-         * Trigger a validation compaction which will return the tree upon completion.
-         */
-        public void doVerb(MessageIn<TreeRequest> message, int id)
-        {
-            TreeRequest remotereq = message.payload;
-            TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.gcBefore, remotereq.cf);
-
-            // trigger read-only compaction
-            ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
-            Validator validator = new Validator(request);
-            logger.debug("Queueing validation compaction for " + request);
-            CompactionManager.instance.submitValidation(store, validator);
-        }
-    }
-
-    /**
-     * Handler for responses from remote nodes which contain a valid tree.
-     * The payload is a completed Validator object from the remote endpoint.
-     */
-    public static class TreeResponseVerbHandler implements IVerbHandler<Validator>
-    {
-        public void doVerb(MessageIn<Validator> message, int id)
-        {
-            // deserialize the remote tree, and register it
-            Validator response = message.payload;
-            TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.gcBefore, response.request.cf);
-            ActiveRepairService.instance.rendezvous(request, response.tree);
-        }
-    }
-
-    /**
-     * A tuple of table and cf.
-     */
-    public static class CFPair extends Pair<String,String>
-    {
-        public CFPair(String table, String cf)
-        {
-            super(table, cf);
-            assert table != null && cf != null;
-        }
-    }
-
-    /**
-     * A tuple of table, cf, address and range that represents a location we have an outstanding TreeRequest for.
-     */
-    public static class TreeRequest
-    {
-        public static final TreeRequestSerializer serializer = new TreeRequestSerializer();
-
-        public final String sessionid;
-        public final InetAddress endpoint;
-        public final Range<Token> range;
-        public final int gcBefore;
-        public final CFPair cf;
-
-        public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, int gcBefore, CFPair cf)
-        {
-            this.sessionid = sessionid;
-            this.endpoint = endpoint;
-            this.cf = cf;
-            this.gcBefore = gcBefore;
-            this.range = range;
-        }
-
-        @Override
-        public final int hashCode()
-        {
-            return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range);
-        }
-
-        @Override
-        public final boolean equals(Object o)
-        {
-            if(!(o instanceof TreeRequest))
-                return false;
-            TreeRequest that = (TreeRequest)o;
-            // handles nulls properly
-            return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore && Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + gcBefore + ", " + cf  + ", " + range + ">";
-        }
-
-        public MessageOut<TreeRequest> createMessage()
-        {
-            return new MessageOut<TreeRequest>(MessagingService.Verb.TREE_REQUEST, this, TreeRequest.serializer);
-        }
-
-        public static class TreeRequestSerializer implements IVersionedSerializer<TreeRequest>
-        {
-            public void serialize(TreeRequest request, DataOutput out, int version) throws IOException
-            {
-                out.writeUTF(request.sessionid);
-                CompactEndpointSerializationHelper.serialize(request.endpoint, out);
-
-                if (version >= MessagingService.VERSION_20)
-                    out.writeInt(request.gcBefore);
-                out.writeUTF(request.cf.left);
-                out.writeUTF(request.cf.right);
-                AbstractBounds.serializer.serialize(request.range, out, version);
-            }
-
-            public TreeRequest deserialize(DataInput in, int version) throws IOException
-            {
-                String sessId = in.readUTF();
-                InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(in);
-                int gcBefore = -1;
-                if (version >= MessagingService.VERSION_20)
-                    gcBefore = in.readInt();
-                CFPair cfpair = new CFPair(in.readUTF(), in.readUTF());
-                Range<Token> range;
-                range = (Range<Token>) AbstractBounds.serializer.deserialize(in, version);
-
-                return new TreeRequest(sessId, endpoint, range, gcBefore, cfpair);
-            }
-
-            public long serializedSize(TreeRequest request, int version)
-            {
-                return TypeSizes.NATIVE.sizeof(request.sessionid)
-                     + CompactEndpointSerializationHelper.serializedSize(request.endpoint)
-                     + TypeSizes.NATIVE.sizeof(request.gcBefore)
-                     + TypeSizes.NATIVE.sizeof(request.cf.left)
-                     + TypeSizes.NATIVE.sizeof(request.cf.right)
-                     + AbstractBounds.serializer.serializedSize(request.range, version);
-            }
-        }
-    }
-
-    /**
-     * Triggers repairs with all neighbors for the given table, cfs and range.
-     */
-    static class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
-    {
-        private final String sessionName;
-        private final boolean isSequential;
-        private final String tablename;
-        private final String[] cfnames;
-        private final Range<Token> range;
-        private volatile Exception exception;
-        private final AtomicBoolean isFailed = new AtomicBoolean(false);
-
-        private final Set<InetAddress> endpoints;
-        final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
-        final Map<String, RepairJob> activeJobs = new ConcurrentHashMap<String, RepairJob>();
-
-        private final SimpleCondition completed = new SimpleCondition();
-        public final Condition differencingDone = new SimpleCondition();
-
-        private volatile boolean terminated = false;
-
-        public RepairSession(TreeRequest req, String tablename, String... cfnames)
-        {
-            this(req.sessionid, req.range, tablename, false, false, cfnames);
-            ActiveRepairService.instance.sessions.put(getName(), this);
-        }
-
-        public RepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
-        {
-            this(UUIDGen.getTimeUUID().toString(), range, tablename, isSequential, isLocal, cfnames);
-        }
-
-        private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String[] cfnames)
-        {
-            this.sessionName = id;
-            this.isSequential = isSequential;
-            this.tablename = tablename;
-            this.cfnames = cfnames;
-            assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
-            this.range = range;
-            this.endpoints = ActiveRepairService.getNeighbors(tablename, range, isLocal);
-        }
-
-        public String getName()
-        {
-            return sessionName;
-        }
-
-        public Range<Token> getRange()
-        {
-            return range;
-        }
-
-        RepairFuture getFuture()
-        {
-            return new RepairFuture(this);
-        }
-
-        private String repairedNodes()
-        {
-            StringBuilder sb = new StringBuilder();
-            sb.append(FBUtilities.getBroadcastAddress());
-            for (InetAddress ep : endpoints)
-                sb.append(", ").append(ep);
-            return sb.toString();
-        }
-
-        // we don't care about the return value but care about it throwing exception
-        public void runMayThrow() throws Exception
-        {
-            logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames)));
-
-            if (endpoints.isEmpty())
-            {
-                differencingDone.signalAll();
-                logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range));
-                return;
-            }
-
-            // Checking all nodes are live
-            for (InetAddress endpoint : endpoints)
-            {
-                if (!FailureDetector.instance.isAlive(endpoint))
-                {
-                    String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
-                    differencingDone.signalAll();
-                    logger.error(String.format("[repair #%s] ", getName()) + message);
-                    throw new IOException(message);
-                }
-            }
-
-            ActiveRepairService.instance.sessions.put(getName(), this);
-            Gossiper.instance.register(this);
-            FailureDetector.instance.registerFailureDetectionEventListener(this);
-            try
-            {
-                // Create and queue a RepairJob for each column family
-                for (String cfname : cfnames)
-                {
-                    RepairJob job = new RepairJob(cfname);
-                    jobs.offer(job);
-                    activeJobs.put(cfname, job);
-                }
-
-                jobs.peek().sendTreeRequests();
-
-                // block whatever thread started this session until all requests have been returned:
-                // if this thread dies, the session will still complete in the background
-                completed.await();
-                if (exception == null)
-                {
-                    logger.info(String.format("[repair #%s] session completed successfully", getName()));
-                }
-                else
-                {
-                    logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception);
-                    throw exception;
-                }
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException("Interrupted while waiting for repair.");
-            }
-            finally
-            {
-                // mark this session as terminated
-                terminate();
-                FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-                Gossiper.instance.unregister(this);
-                ActiveRepairService.instance.sessions.remove(getName());
-            }
-        }
-
-        /**
-         * @return whether this session is terminated
-         */
-        public boolean terminated()
-        {
-            return terminated;
-        }
-
-        public void terminate()
-        {
-            terminated = true;
-            for (RepairJob job : jobs)
-                job.terminate();
-            jobs.clear();
-            activeJobs.clear();
-        }
-
-        /**
-         * terminate this session.
-         */
-        public void forceShutdown()
-        {
-            differencingDone.signalAll();
-            completed.signalAll();
-        }
-
-        void completed(Differencer differencer)
-        {
-            logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s",
-                                       getName(),
-                                       differencer.r1.endpoint,
-                                       differencer.r2.endpoint,
-                                       differencer.cfname));
-            RepairJob job = activeJobs.get(differencer.cfname);
-            if (job == null)
-            {
-                assert terminated;
-                return;
-            }
-
-            if (job.completedSynchronization(differencer))
-            {
-                activeJobs.remove(differencer.cfname);
-                String remaining = activeJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", activeJobs.size());
-                logger.info(String.format("[repair #%s] %s is fully synced%s", getName(), differencer.cfname, remaining));
-                if (activeJobs.isEmpty())
-                    completed.signalAll();
-            }
-        }
-
-        void failedNode(InetAddress remote)
-        {
-            String errorMsg = String.format("Endpoint %s died", remote);
-            exception = new IOException(errorMsg);
-            // If a node failed, we stop everything (though there could still be some activity in the background)
-            forceShutdown();
-        }
-
-        public void onJoin(InetAddress endpoint, EndpointState epState) {}
-        public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-        public void onAlive(InetAddress endpoint, EndpointState state) {}
-        public void onDead(InetAddress endpoint, EndpointState state) {}
-
-        public void onRemove(InetAddress endpoint)
-        {
-            convict(endpoint, Double.MAX_VALUE);
-        }
-
-        public void onRestart(InetAddress endpoint, EndpointState epState)
-        {
-            convict(endpoint, Double.MAX_VALUE);
-        }
-
-        public void convict(InetAddress endpoint, double phi)
-        {
-            if (!endpoints.contains(endpoint))
-                return;
-
-            // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
-            if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-                return;
-
-            // Though unlikely, it is possible to arrive here multiple time and we
-            // want to avoid print an error message twice
-            if (!isFailed.compareAndSet(false, true))
-                return;
-
-            failedNode(endpoint);
-        }
-
-        class RepairJob
-        {
-            private final String cfname;
-            // first we send tree requests.  this tracks the endpoints remaining to hear from
-            private final RequestCoordinator<TreeRequest> treeRequests;
-            // tree responses are then tracked here
-            private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size() + 1);
-            // once all responses are received, each tree is compared with each other, and differencer tasks
-            // are submitted.  the job is done when all differencers are complete.
-            private final RequestCoordinator<Differencer> differencers;
-            private final Condition requestsSent = new SimpleCondition();
-            private CountDownLatch snapshotLatch = null;
-
-            public RepairJob(String cfname)
-            {
-                this.cfname = cfname;
-                this.treeRequests = new RequestCoordinator<TreeRequest>(isSequential)
-                {
-                    public void send(TreeRequest r)
-                    {
-                        MessagingService.instance().sendOneWay(r.createMessage(), r.endpoint);
-                    }
-                };
-                this.differencers = new RequestCoordinator<Differencer>(isSequential)
-                {
-                    public void send(Differencer d)
-                    {
-                        StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
-                    }
-                };
-            }
-
-            /**
-             * Send merkle tree request to every involved neighbor.
-             */
-            public void sendTreeRequests()
-            {
-                // send requests to all nodes
-                List<InetAddress> allEndpoints = new ArrayList<InetAddress>(endpoints);
-                allEndpoints.add(FBUtilities.getBroadcastAddress());
-
-                if (isSequential)
-                    makeSnapshots(endpoints);
-
-                int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
-
-                for (InetAddress endpoint : allEndpoints)
-                    treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));
-
-                logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints));
-                treeRequests.start();
-                requestsSent.signalAll();
-            }
-
-            public void makeSnapshots(Collection<InetAddress> endpoints)
-            {
-                try
-                {
-                    snapshotLatch = new CountDownLatch(endpoints.size());
-                    IAsyncCallback callback = new IAsyncCallback()
-                    {
-                        public boolean isLatencyForSnitch()
-                        {
-                            return false;
-                        }
-
-                        public void response(MessageIn msg)
-                        {
-                            RepairJob.this.snapshotLatch.countDown();
-                        }
-                    };
-                    for (InetAddress endpoint : endpoints)
-                        MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false).createMessage(), endpoint, callback);
-                    snapshotLatch.await();
-                    snapshotLatch = null;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            /**
-             * Add a new received tree and return the number of remaining tree to
-             * be received for the job to be complete.
-             *
-             * Callers may assume exactly one addTree call will result in zero remaining endpoints.
-             */
-            public synchronized int addTree(TreeRequest request, MerkleTree tree)
-            {
-                // Wait for all request to have been performed (see #3400)
-                try
-                {
-                    requestsSent.await();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError("Interrupted while waiting for requests to be sent");
-                }
-
-                assert request.cf.right.equals(cfname);
-                trees.add(new TreeResponse(request.endpoint, tree));
-                return treeRequests.completed(request);
-            }
-
-            /**
-             * Submit differencers for running.
-             * All tree *must* have been received before this is called.
-             */
-            public void submitDifferencers()
-            {
-                // We need to difference all trees one against another
-                for (int i = 0; i < trees.size() - 1; ++i)
-                {
-                    TreeResponse r1 = trees.get(i);
-                    for (int j = i + 1; j < trees.size(); ++j)
-                    {
-                        TreeResponse r2 = trees.get(j);
-                        Differencer differencer = new Differencer(cfname, r1, r2);
-                        logger.debug("Queueing comparison {}", differencer);
-                        differencers.add(differencer);
-                    }
-                }
-                differencers.start();
-                trees.clear(); // allows gc to do its thing
-            }
-
-            /**
-             * @return true if the differencer was the last remaining
-             */
-            synchronized boolean completedSynchronization(Differencer differencer)
-            {
-                return differencers.completed(differencer) == 0;
-            }
-
-            public void terminate()
-            {
-                if (snapshotLatch != null)
-                {
-                    while (snapshotLatch.getCount() > 0)
-                        snapshotLatch.countDown();
-                }
-            }
-        }
-
-        /**
-         * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
-         */
-        class Differencer implements Runnable
+        switch (message.messageType)
         {
-            public final String cfname;
-            public final TreeResponse r1;
-            public final TreeResponse r2;
-            public final List<Range<Token>> differences = new ArrayList<Range<Token>>();
-
-            Differencer(String cfname, TreeResponse r1, TreeResponse r2)
-            {
-                this.cfname = cfname;
-                this.r1 = r1;
-                this.r2 = r2;
-            }
-
-            /**
-             * Compares our trees, and triggers repairs for any ranges that mismatch.
-             */
-            public void run()
-            {
-                // restore partitioners (in case we were serialized)
-                if (r1.tree.partitioner() == null)
-                    r1.tree.partitioner(StorageService.getPartitioner());
-                if (r2.tree.partitioner() == null)
-                    r2.tree.partitioner(StorageService.getPartitioner());
-
-                // compare trees, and collect differences
-                differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
-
-                // choose a repair method based on the significance of the difference
-                String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname);
-                if (differences.isEmpty())
-                {
-                    logger.info(String.format(format, "are consistent"));
-                    completed(this);
-                    return;
-                }
-
-                // non-0 difference: perform streaming repair
-                logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
-                performStreamingRepair();
-            }
-
-            /**
-             * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
-             * that will be called out of band once the streams complete.
-             */
-            void performStreamingRepair()
-            {
-                Runnable callback = new Runnable()
-                {
-                    public void run()
-                    {
-                        completed(Differencer.this);
-                    }
-                };
-                StreamingRepairTask task = StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, differences, callback);
-
-                task.run();
-            }
-
-            public String toString()
-            {
-                return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + "/" + range + ">";
-            }
-        }
-    }
-
-    static class TreeResponse
-    {
-        public final InetAddress endpoint;
-        public final MerkleTree tree;
-
-        TreeResponse(InetAddress endpoint, MerkleTree tree)
-        {
-            this.endpoint = endpoint;
-            this.tree = tree;
-        }
-    }
-
-    public static class RepairFuture extends FutureTask
-    {
-        public final RepairSession session;
-
-        RepairFuture(RepairSession session)
-        {
-            super(session, null);
-            this.session = session;
-        }
-    }
-
-    public static abstract class RequestCoordinator<R>
-    {
-        private final Order<R> orderer;
-
-        protected RequestCoordinator(boolean isSequential)
-        {
-            this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
-        }
-
-        public abstract void send(R request);
-
-        public void add(R request)
-        {
-            orderer.add(request);
-        }
-
-        public void start()
-        {
-            orderer.start();
-        }
-
-        // Returns how many request remains
-        public int completed(R request)
-        {
-            return orderer.completed(request);
-        }
-
-        private static abstract class Order<R>
-        {
-            protected final RequestCoordinator<R> coordinator;
-
-            Order(RequestCoordinator<R> coordinator)
-            {
-                this.coordinator = coordinator;
-            }
-
-            public abstract void add(R request);
-            public abstract void start();
-            public abstract int completed(R request);
-        }
-
-        private static class SequentialOrder<R> extends Order<R>
-        {
-            private final Queue<R> requests = new LinkedList<R>();
-
-            SequentialOrder(RequestCoordinator<R> coordinator)
-            {
-                super(coordinator);
-            }
-
-            public void add(R request)
-            {
-                requests.add(request);
-            }
-
-            public void start()
-            {
-                if (requests.isEmpty())
-                    return;
-
-                coordinator.send(requests.peek());
-            }
-
-            public int completed(R request)
-            {
-                assert request.equals(requests.peek());
-                requests.poll();
-                int remaining = requests.size();
-                if (remaining != 0)
-                    coordinator.send(requests.peek());
-                return remaining;
-            }
-        }
-
-        private static class ParallelOrder<R> extends Order<R>
-        {
-            private final Set<R> requests = new HashSet<R>();
-
-            ParallelOrder(RequestCoordinator<R> coordinator)
-            {
-                super(coordinator);
-            }
-
-            public void add(R request)
-            {
-                requests.add(request);
-            }
-
-            public void start()
-            {
-                for (R request : requests)
-                    coordinator.send(request);
-            }
-
-            public int completed(R request)
-            {
-                requests.remove(request);
-                return requests.size();
-            }
+            case VALIDATION_COMPLETE:
+                ValidationComplete validation = (ValidationComplete) message;
+                session.validationComplete(desc, endpoint, validation.tree);
+                break;
+            case SYNC_COMPLETE:
+                // one of replica is synced.
+                SyncComplete sync = (SyncComplete) message;
+                session.syncComplete(desc, sync.nodes, sync.success);
+                break;
+            default:
+                break;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index accc2c5..a67889a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -41,8 +41,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Level;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +72,8 @@ import org.apache.cassandra.net.AsyncOneResponse;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
-import org.apache.cassandra.service.ActiveRepairService.TreeRequestVerbHandler;
+import org.apache.cassandra.repair.RepairFuture;
+import org.apache.cassandra.repair.RepairMessageVerbHandler;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -238,10 +239,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TREE_REQUEST, new TreeRequestVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TREE_RESPONSE, new ActiveRepairService.TreeResponseVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAMING_REPAIR_REQUEST, new StreamingRepairTask.StreamingRepairRequest());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, new StreamingRepairTask.StreamingRepairResponse());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPAIR_MESSAGE, new RepairMessageVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_SHUTDOWN, new GossipShutdownVerbHandler());
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
@@ -2348,10 +2346,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
 
-                List<ActiveRepairService.RepairFuture> futures = new ArrayList<ActiveRepairService.RepairFuture>(ranges.size());
+                List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
-                    ActiveRepairService.RepairFuture future;
+                    RepairFuture future;
                     try
                     {
                         future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies);
@@ -2377,23 +2375,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                     }
                 }
-                for (ActiveRepairService.RepairFuture future : futures)
+                for (RepairFuture future : futures)
                 {
                     try
                     {
                         future.get();
-                        message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
+                        message = String.format("Repair session %s for range %s finished", future.session.getId(), future.session.getRange().toString());
+                        logger.info(message);
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
                     }
                     catch (ExecutionException e)
                     {
-                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
+                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getCause().getMessage());
                         logger.error(message, e);
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                     }
                     catch (Exception e)
                     {
-                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
+                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getMessage());
                         logger.error(message, e);
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                     }
@@ -2403,7 +2402,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }, null);
     }
 
-    public ActiveRepairService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
+    public RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
deleted file mode 100644
index 3730b0e..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * Task that make two nodes exchange (stream) some ranges (for a given table/cf).
- * This handle the case where the local node is neither of the two nodes that
- * must stream their range, and allow to register a callback to be called on
- * completion.
- */
-public class StreamingRepairTask implements Runnable
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
-
-    // maps of tasks created on this node
-    private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>();
-    public static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
-
-    public final UUID id;
-    private final InetAddress owner; // the node where the task is created; can be == src but don't need to
-    public final InetAddress src;
-    public final InetAddress dst;
-
-    private final String tableName;
-    private final String cfName;
-    private final Collection<Range<Token>> ranges;
-    private final StreamEventHandler callback;
-
-    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, StreamEventHandler callback)
-    {
-        this.id = id;
-        this.owner = owner;
-        this.src = src;
-        this.dst = dst;
-        this.tableName = tableName;
-        this.cfName = cfName;
-        this.ranges = ranges;
-        this.callback = callback;
-    }
-
-    public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback)
-    {
-        InetAddress local = FBUtilities.getBroadcastAddress();
-        UUID id = UUIDGen.getTimeUUID();
-        // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
-        InetAddress src = ep2.equals(local) ? ep2 : ep1;
-        InetAddress dst = ep2.equals(local) ? ep1 : ep2;
-        StreamingRepairTask task = new StreamingRepairTask(id, local, src, dst, tableName, cfName, ranges, wrapCallback(callback, id, local.equals(src)));
-        tasks.put(id, task);
-        return task;
-    }
-
-    /**
-     * Returns true if the task if the task can be executed locally, false if
-     * it has to be forwarded.
-     */
-    public boolean isLocalTask()
-    {
-        return owner.equals(src);
-    }
-
-    public void run()
-    {
-        if (src.equals(FBUtilities.getBroadcastAddress()))
-        {
-            initiateStreaming();
-        }
-        else
-        {
-            forwardToSource();
-        }
-    }
-
-    private void initiateStreaming()
-    {
-        logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
-        StreamResultFuture op = new StreamPlan("Repair")
-                                    .flushBeforeTransfer(true)
-                                    // request ranges from the remote node
-                                    .requestRanges(dst, tableName, ranges, cfName)
-                                    // send ranges to the remote node
-                                    .transferRanges(dst, tableName, ranges, cfName)
-                                    .execute();
-        op.addEventListener(callback);
-    }
-
-    private void forwardToSource()
-    {
-        logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", id, ranges.size(), src, dst));
-        MessageOut<StreamingRepairTask> msg = new MessageOut<StreamingRepairTask>(MessagingService.Verb.STREAMING_REPAIR_REQUEST,
-                                                                                  this,
-                                                                                  StreamingRepairTask.serializer);
-        MessagingService.instance().sendOneWay(msg, src);
-    }
-
-    private static StreamEventHandler makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
-    {
-        return new StreamEventHandler()
-        {
-            public void onSuccess(StreamState finalState)
-            {
-                StreamingRepairResponse.reply(taskOwner, taskId);
-            }
-
-            public void onFailure(Throwable t) {}
-            public void handleStreamEvent(StreamEvent event) {}
-        };
-    }
-
-    // wrap a given callback so as to unregister the streaming repair task on completion
-    private static StreamEventHandler wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
-    {
-        return new StreamEventHandler()
-        {
-            public void onSuccess(StreamState finalState)
-            {
-                tasks.remove(taskid);
-                if (callback != null)
-                    callback.run();
-            }
-
-            public void onFailure(Throwable t) {}
-            public void handleStreamEvent(StreamEvent event) {}
-        };
-    }
-
-    public static class StreamingRepairRequest implements IVerbHandler<StreamingRepairTask>
-    {
-        public void doVerb(MessageIn<StreamingRepairTask> message, int id)
-        {
-            StreamingRepairTask task = message.payload;
-            assert task.src.equals(FBUtilities.getBroadcastAddress());
-            assert task.owner.equals(message.from);
-
-            logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.from, task.ranges.size(), task.dst));
-
-            task.run();
-        }
-
-    }
-
-    public static class StreamingRepairResponse implements IVerbHandler<UUID>
-    {
-        public void doVerb(MessageIn<UUID> message, int id)
-        {
-            UUID taskid = message.payload;
-            StreamingRepairTask task = tasks.get(taskid);
-            if (task == null)
-            {
-                logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.from, taskid));
-                return;
-            }
-
-            assert task.owner.equals(FBUtilities.getBroadcastAddress());
-
-            logger.info(String.format("[streaming task #%s] task succeeded", task.id));
-            if (task.callback != null)
-            {
-                // TODO null
-                task.callback.onSuccess(null);
-            }
-        }
-
-        private static void reply(InetAddress remote, UUID taskid)
-        {
-            logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
-            MessageOut<UUID> message = new MessageOut<UUID>(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDSerializer.serializer);
-            MessagingService.instance().sendOneWay(message, remote);
-        }
-    }
-
-    private static class StreamingRepairTaskSerializer implements IVersionedSerializer<StreamingRepairTask>
-    {
-        public void serialize(StreamingRepairTask task, DataOutput out, int version) throws IOException
-        {
-            UUIDSerializer.serializer.serialize(task.id, out, version);
-            CompactEndpointSerializationHelper.serialize(task.owner, out);
-            CompactEndpointSerializationHelper.serialize(task.src, out);
-            CompactEndpointSerializationHelper.serialize(task.dst, out);
-            out.writeUTF(task.tableName);
-            out.writeUTF(task.cfName);
-            out.writeInt(task.ranges.size());
-            for (Range<Token> range : task.ranges)
-                AbstractBounds.serializer.serialize(range, out, version);
-            // We don't serialize the callback on purpose
-        }
-
-        public StreamingRepairTask deserialize(DataInput in, int version) throws IOException
-        {
-            UUID id = UUIDSerializer.serializer.deserialize(in, version);
-            InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
-            String tableName = in.readUTF();
-            String cfName = in.readUTF();
-            int rangesCount = in.readInt();
-            List<Range<Token>> ranges = new ArrayList<Range<Token>>(rangesCount);
-            for (int i = 0; i < rangesCount; ++i)
-                ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
-            return new StreamingRepairTask(id, owner, src, dst, tableName, cfName, ranges, makeReplyingCallback(owner, id));
-        }
-
-        public long serializedSize(StreamingRepairTask task, int version)
-        {
-            long size = UUIDSerializer.serializer.serializedSize(task.id, version);
-            size += 3 * CompactEndpointSerializationHelper.serializedSize(task.owner);
-            size += TypeSizes.NATIVE.sizeof(task.tableName);
-            size += TypeSizes.NATIVE.sizeof(task.cfName);
-            size += TypeSizes.NATIVE.sizeof(task.ranges.size());
-            for (Range<Token> range : task.ranges)
-                size += AbstractBounds.serializer.serializedSize(range, version);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 8a5572e..9288759 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -213,6 +213,11 @@ public class FBUtilities
         return FastByteComparisons.compareTo(bytes1, offset1, len1, bytes2, offset2, len2);
     }
 
+    public static int compareUnsigned(byte[] bytes1, byte[] bytes2)
+    {
+        return compareUnsigned(bytes1, bytes2, 0, 0, bytes1.length, bytes2.length);
+    }
+
     /**
      * @return The bitwise XOR of the inputs. The output will be the same length as the
      * longer input, but if either input is null, the output will be null.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 5284a57..a16c2b9 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.PeekingIterator;
 
@@ -30,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
 
 /**
@@ -65,18 +67,9 @@ public class MerkleTree implements Serializable
 
     public final byte hashdepth;
 
-    /**
-     * The top level range that this MerkleTree covers.
-     * In a perfect world, this should be final and *not* transient. However
-     * this would break serialization with version &gte; 0.7 because it uses
-     * java serialization. We are moreover always shipping the fullRange will
-     * the request so we can add it back post-deserialization (as for the
-     * partitioner).
-     */
-    public transient Range<Token> fullRange;
-
-    // TODO This is broken; Token serialization assumes system partitioner, so if this doesn't match all hell breaks loose
-    private transient IPartitioner partitioner;
+    /** The top level range that this MerkleTree covers. */
+    public final Range<Token> fullRange;
+    private final IPartitioner partitioner;
 
     private long maxsize;
     private long size;
@@ -89,6 +82,10 @@ public class MerkleTree implements Serializable
             out.writeByte(mt.hashdepth);
             out.writeLong(mt.maxsize);
             out.writeLong(mt.size);
+            out.writeUTF(mt.partitioner.getClass().getCanonicalName());
+            // full range
+            Token.serializer.serialize(mt.fullRange.left, out);
+            Token.serializer.serialize(mt.fullRange.right, out);
             Hashable.serializer.serialize(mt.root, out, version);
         }
 
@@ -97,7 +94,22 @@ public class MerkleTree implements Serializable
             byte hashdepth = in.readByte();
             long maxsize = in.readLong();
             long size = in.readLong();
-            MerkleTree mt = new MerkleTree(null, null, hashdepth, maxsize);
+            IPartitioner partitioner;
+            try
+            {
+                partitioner = FBUtilities.newPartitioner(in.readUTF());
+            }
+            catch (ConfigurationException e)
+            {
+                throw new IOException(e);
+            }
+
+            // full range
+            Token left = Token.serializer.deserialize(in);
+            Token right = Token.serializer.deserialize(in);
+            Range<Token> fullRange = new Range<>(left, right, partitioner);
+
+            MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
             mt.size = size;
             mt.root = Hashable.serializer.deserialize(in, version);
             return mt;
@@ -105,10 +117,17 @@ public class MerkleTree implements Serializable
 
         public long serializedSize(MerkleTree mt, int version)
         {
-            return 1 // mt.hashdepth
+            long size = 1 // mt.hashdepth
                  + TypeSizes.NATIVE.sizeof(mt.maxsize)
                  + TypeSizes.NATIVE.sizeof(mt.size)
-                 + Hashable.serializer.serializedSize(mt.root, version);
+                 + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName());
+
+            // full range
+            size += Token.serializer.serializedSize(mt.fullRange.left, TypeSizes.NATIVE);
+            size += Token.serializer.serializedSize(mt.fullRange.right, TypeSizes.NATIVE);
+
+            size += Hashable.serializer.serializedSize(mt.root, version);
+            return size;
         }
     }
 
@@ -122,8 +141,8 @@ public class MerkleTree implements Serializable
     public MerkleTree(IPartitioner partitioner, Range<Token> range, byte hashdepth, long maxsize)
     {
         assert hashdepth < Byte.MAX_VALUE;
-        this.fullRange = range;
-        this.partitioner = partitioner;
+        this.fullRange = Preconditions.checkNotNull(range);
+        this.partitioner = Preconditions.checkNotNull(partitioner);
         this.hashdepth = hashdepth;
         this.maxsize = maxsize;
 
@@ -199,14 +218,6 @@ public class MerkleTree implements Serializable
     }
 
     /**
-     * TODO: Find another way to use the local partitioner after serialization.
-     */
-    public void partitioner(IPartitioner partitioner)
-    {
-        this.partitioner = partitioner;
-    }
-
-    /**
      * @param ltree First tree.
      * @param rtree Second tree.
      * @return A list of the largest contiguous ranges where the given trees disagree.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.SyncComplete.bin b/test/data/serialization/2.0/service.SyncComplete.bin
new file mode 100644
index 0000000..66c72e1
Binary files /dev/null and b/test/data/serialization/2.0/service.SyncComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.SyncRequest.bin b/test/data/serialization/2.0/service.SyncRequest.bin
new file mode 100644
index 0000000..8918405
Binary files /dev/null and b/test/data/serialization/2.0/service.SyncRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.TreeRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeRequest.bin b/test/data/serialization/2.0/service.TreeRequest.bin
deleted file mode 100644
index b336e50..0000000
Binary files a/test/data/serialization/2.0/service.TreeRequest.bin and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.TreeResponse.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeResponse.bin b/test/data/serialization/2.0/service.TreeResponse.bin
deleted file mode 100644
index b63d8a2..0000000
Binary files a/test/data/serialization/2.0/service.TreeResponse.bin and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.ValidationComplete.bin b/test/data/serialization/2.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..bc633bc
Binary files /dev/null and b/test/data/serialization/2.0/service.ValidationComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.ValidationRequest.bin b/test/data/serialization/2.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..4ec4c47
Binary files /dev/null and b/test/data/serialization/2.0/service.ValidationRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 3fb828b..9218dc9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.UUID;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,12 +41,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -87,11 +88,10 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         assert strategy.getLevelSize(1) > 0;
         assert strategy.getLevelSize(2) > 0;
 
-        ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname);
         Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
         int gcBefore = table.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
-        ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p);
-        ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req);
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), ksname, cfname, range);
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
new file mode 100644
index 0000000..2502620
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.utils.MerkleTree;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DifferencerTest extends SchemaLoader
+{
+    private static final IPartitioner partirioner = new Murmur3Partitioner();
+
+    @After
+    public void tearDown()
+    {
+        SinkManager.clear();
+    }
+
+    /**
+     * When there is no difference between two, Differencer should respond SYNC_COMPLETE
+     */
+    @Test
+    public void testNoDifference() throws Throwable
+    {
+        final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
+        final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
+
+        SinkManager.add(new IMessageSink()
+        {
+            @SuppressWarnings("unchecked")
+            public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+            {
+                if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+                {
+                    RepairMessage m = (RepairMessage) message.payload;
+                    assertEquals(RepairMessage.Type.SYNC_COMPLETE, m.messageType);
+                    // we should see SYNC_COMPLETE
+                    assertEquals(new NodePair(ep1, ep2), ((SyncComplete)m).nodes);
+                }
+                return null;
+            }
+
+            public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+            {
+                return null;
+            }
+        });
+        Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+
+        MerkleTree tree1 = createInitialTree(desc);
+        MerkleTree tree2 = createInitialTree(desc);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(ep1, tree1);
+        TreeResponse r2 = new TreeResponse(ep2, tree2);
+        Differencer diff = new Differencer(desc, r1, r2);
+        diff.run();
+
+        assertTrue(diff.differences.isEmpty());
+    }
+
+    @Test
+    public void testDifference() throws Throwable
+    {
+        Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+
+        MerkleTree tree1 = createInitialTree(desc);
+        MerkleTree tree2 = createInitialTree(desc);
+
+        // change a range in one of the trees
+        Token token = partirioner.midpoint(range.left, range.right);
+        tree1.invalidate(token);
+        MerkleTree.TreeRange changed = tree1.get(token);
+        changed.hash("non-empty hash!".getBytes());
+
+        Set<Range<Token>> interesting = new HashSet<>();
+        interesting.add(changed);
+
+        // difference the trees
+        // note: we reuse the same endpoint which is bogus in theory but fine here
+        TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
+        TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
+        Differencer diff = new Differencer(desc, r1, r2);
+        diff.run();
+
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting, new HashSet<>(diff.differences));
+    }
+
+    private MerkleTree createInitialTree(RepairJobDesc desc)
+    {
+        MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.addHash(Validator.EMPTY_ROW);
+        }
+        return tree;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
new file mode 100644
index 0000000..12abd24
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TreeMapBackedSortedColumns;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.SimpleCondition;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ValidatorTest extends SchemaLoader
+{
+    private final String keyspace = "Keyspace1";
+    private final String columnFamily = "Standard1";
+    private final IPartitioner partitioner = StorageService.getPartitioner();
+
+    @After
+    public void tearDown()
+    {
+        SinkManager.clear();
+    }
+
+    @Test
+    public void testValidatorComplete() throws Throwable
+    {
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+
+        final SimpleCondition lock = new SimpleCondition();
+        SinkManager.add(new IMessageSink()
+        {
+            @SuppressWarnings("unchecked")
+            public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+            {
+                try
+                {
+                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+                    {
+                        RepairMessage m = (RepairMessage) message.payload;
+                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+                        assertEquals(desc, m.desc);
+                        assertTrue(((ValidationComplete)m).success);
+                        assertNotNull(((ValidationComplete)m).tree);
+                    }
+                }
+                finally
+                {
+                    lock.signalAll();
+                }
+                return null;
+            }
+
+            public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+            {
+                return null;
+            }
+        });
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.prepare(cfs);
+
+        // and confirm that the tree was split
+        assertTrue(validator.tree.size() > 1);
+
+        // add a row
+        Token mid = partitioner.midpoint(range.left, range.right);
+        validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
+                                                 TreeMapBackedSortedColumns.factory.create(cfs.metadata)));
+        validator.complete();
+
+        // confirm that the tree was validated
+        Token min = validator.tree.partitioner().getMinimumToken();
+        assertNotNull(validator.tree.hash(new Range<>(min, min)));
+
+        if (!lock.isSignaled())
+            lock.await();
+    }
+
+    @Test
+    public void testValidatorFailed() throws Throwable
+    {
+        Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+
+        final SimpleCondition lock = new SimpleCondition();
+        SinkManager.add(new IMessageSink()
+        {
+            @SuppressWarnings("unchecked")
+            public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+            {
+                try
+                {
+                    if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+                    {
+                        RepairMessage m = (RepairMessage) message.payload;
+                        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+                        assertEquals(desc, m.desc);
+                        assertFalse(((ValidationComplete) m).success);
+                        assertNull(((ValidationComplete)m).tree);
+                    }
+                }
+                finally
+                {
+                    lock.signalAll();
+                }
+                return null;
+            }
+
+            public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+            {
+                return null;
+            }
+        });
+
+        InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+        Validator validator = new Validator(desc, remote, 0);
+        validator.fail();
+
+        if (!lock.isSignaled())
+            lock.await();
+    }
+}