You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 18:59:24 UTC
[2/3] Redesign repair messages
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
new file mode 100644
index 0000000..3b9f6dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * ValidationRequest
+ *
+ * @since 2.0
+ */
+public class ValidationRequest extends RepairMessage
+{
+ public static MessageSerializer serializer = new ValidationRequestSerializer();
+
+ public final int gcBefore;
+
+ public ValidationRequest(RepairJobDesc desc, int gcBefore)
+ {
+ super(Type.VALIDATION_REQUEST, desc);
+ this.gcBefore = gcBefore;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ValidationRequest that = (ValidationRequest) o;
+ return gcBefore == that.gcBefore;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return gcBefore;
+ }
+
+ public static class ValidationRequestSerializer implements MessageSerializer<ValidationRequest>
+ {
+ public void serialize(ValidationRequest message, DataOutput out, int version) throws IOException
+ {
+ RepairJobDesc.serializer.serialize(message.desc, out, version);
+ out.writeInt(message.gcBefore);
+ }
+
+ public ValidationRequest deserialize(DataInput dis, int version) throws IOException
+ {
+ RepairJobDesc desc = RepairJobDesc.serializer.deserialize(dis, version);
+ return new ValidationRequest(desc, dis.readInt());
+ }
+
+ public long serializedSize(ValidationRequest message, int version)
+ {
+ long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+ size += TypeSizes.NATIVE.sizeof(message.gcBefore);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 82ecbe9..2bcf579 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,37 +17,25 @@
*/
package org.apache.cassandra.service;
-import java.io.*;
import java.net.InetAddress;
-import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import com.google.common.base.Objects;
import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.*;
-import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.streaming.StreamingRepairTask;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.repair.*;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.utils.FBUtilities;
/**
* ActiveRepairService encapsulates "validating" (hashing) individual column families,
@@ -79,8 +67,6 @@ import org.apache.cassandra.utils.*;
*/
public class ActiveRepairService
{
- private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
-
// singleton enforcement
public static final ActiveRepairService instance = new ActiveRepairService();
@@ -103,14 +89,14 @@ public class ActiveRepairService
/**
* A map of active session.
*/
- private final ConcurrentMap<String, RepairSession> sessions;
+ private final ConcurrentMap<UUID, RepairSession> sessions;
/**
* Protected constructor. Use ActiveRepairService.instance.
*/
protected ActiveRepairService()
{
- sessions = new ConcurrentHashMap<String, RepairSession>();
+ sessions = new ConcurrentHashMap<>();
}
/**
@@ -118,16 +104,30 @@ public class ActiveRepairService
*
* @return Future for asynchronous call or null if there is no need to repair
*/
- public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
+ public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
{
- RepairSession session = new RepairSession(range, tablename, isSequential, isLocal, cfnames);
+ RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames);
if (session.endpoints.isEmpty())
return null;
- RepairFuture futureTask = session.getFuture();
+ RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
return futureTask;
}
+ public void addToActiveSessions(RepairSession session)
+ {
+ sessions.put(session.getId(), session);
+ Gossiper.instance.register(session);
+ FailureDetector.instance.registerFailureDetectionEventListener(session);
+ }
+
+ public void removeFromActiveSessions(RepairSession session)
+ {
+ FailureDetector.instance.unregisterFailureDetectionEventListener(session);
+ Gossiper.instance.unregister(session);
+ sessions.remove(session.getId());
+ }
+
public void terminateSessions()
{
for (RepairSession session : sessions.values())
@@ -138,9 +138,11 @@ public class ActiveRepairService
// for testing only. Create a session corresponding to a fake request and
// add it to the sessions (avoid NPE in tests)
- RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames)
+ RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
- RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture();
+ RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily});
+ sessions.put(session.getId(), session);
+ RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
return futureTask;
}
@@ -154,7 +156,7 @@ public class ActiveRepairService
*
* @return neighbors with whom we share the provided range
*/
- static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
+ public static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
@@ -174,7 +176,7 @@ public class ActiveRepairService
if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
return Collections.emptySet();
- Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
+ Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet));
neighbors.remove(FBUtilities.getBroadcastAddress());
if (isLocal)
@@ -187,949 +189,25 @@ public class ActiveRepairService
return neighbors;
}
- /**
- * Register a tree for the given request to be compared to the appropriate trees in Stage.ANTIENTROPY when they become available.
- */
- private void rendezvous(TreeRequest request, MerkleTree tree)
+ public void handleMessage(InetAddress endpoint, RepairMessage message)
{
- RepairSession session = sessions.get(request.sessionid);
+ RepairJobDesc desc = message.desc;
+ RepairSession session = sessions.get(desc.sessionId);
if (session == null)
- {
- logger.warn("Got a merkle tree response for unknown repair session {}: either this node has been restarted since the session was started, or the session has been interrupted for an unknown reason. ", request.sessionid);
return;
- }
-
- RepairSession.RepairJob job = session.jobs.peek();
- if (job == null)
- {
- assert session.terminated();
- return;
- }
-
- logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint));
-
- if (job.addTree(request, tree) == 0)
- {
- logger.debug("All trees received for " + session.getName() + "/" + request.cf.right);
- job.submitDifferencers();
-
- // This job is complete, switching to next in line (note that only
- // one thread will can ever do this)
- session.jobs.poll();
- RepairSession.RepairJob nextJob = session.jobs.peek();
- if (nextJob == null)
- // We are done with this repair session as far as differencing
- // is considern. Just inform the session
- session.differencingDone.signalAll();
- else
- nextJob.sendTreeRequests();
- }
- }
-
- /**
- * Responds to the node that requested the given valid tree.
- * @param validator A locally generated validator
- * @param local localhost (parameterized for testing)
- */
- void respond(Validator validator, InetAddress local)
- {
- MessagingService ms = MessagingService.instance();
-
- try
- {
- if (!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
- logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s", validator.request.sessionid, validator.request.endpoint, validator.request.cf));
- ms.sendOneWay(validator.createMessage(), validator.request.endpoint);
- }
- catch (Exception e)
- {
- logger.error(String.format("[repair #%s] Error sending completed merkle tree to %s for %s ", validator.request.sessionid, validator.request.endpoint, validator.request.cf), e);
- }
- }
-
- /**
- * A Strategy to handle building a merkle tree for a column family.
- *
- * Lifecycle:
- * 1. prepare() - Initialize tree with samples.
- * 2. add() - 0 or more times, to add hashes to the tree.
- * 3. complete() - complete building tree and send it back to the initiator
- */
- public static class Validator implements Runnable
- {
- public final TreeRequest request;
- public final MerkleTree tree;
-
- // null when all rows with the min token have been consumed
- private transient long validated;
- private transient MerkleTree.TreeRange range;
- private transient MerkleTree.TreeRangeIterator ranges;
- private transient DecoratedKey lastKey;
-
- public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
- public static ValidatorSerializer serializer = new ValidatorSerializer();
-
- public Validator(TreeRequest request)
- {
- this(request,
- // TODO: memory usage (maxsize) should either be tunable per
- // CF, globally, or as shared for all CFs in a cluster
- new MerkleTree(DatabaseDescriptor.getPartitioner(), request.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
- }
-
- Validator(TreeRequest request, MerkleTree tree)
- {
- this.request = request;
- this.tree = tree;
- // Reestablishing the range because we don't serialize it (for bad
- // reason - see MerkleTree for details)
- this.tree.fullRange = this.request.range;
- validated = 0;
- range = null;
- ranges = null;
- }
-
- public void prepare(ColumnFamilyStore cfs)
- {
- if (!tree.partitioner().preservesOrder())
- {
- // You can't beat an even tree distribution for md5
- tree.init();
- }
- else
- {
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
- for (DecoratedKey sample : cfs.keySamples(request.range))
- {
- assert request.range.contains(sample.token): "Token " + sample.token + " is not within range " + request.range;
- keys.add(sample);
- }
-
- if (keys.isEmpty())
- {
- // use an even tree distribution
- tree.init();
- }
- else
- {
- int numkeys = keys.size();
- Random random = new Random();
- // sample the column family using random keys from the index
- while (true)
- {
- DecoratedKey dk = keys.get(random.nextInt(numkeys));
- if (!tree.split(dk.token))
- break;
- }
- }
- }
- logger.debug("Prepared AEService tree of size " + tree.size() + " for " + request);
- ranges = tree.invalids();
- }
-
- /**
- * Called (in order) for rows in given range present in the CF.
- * Hashes the row, and adds it to the tree being built.
- *
- * @param row The row.
- */
- public void add(AbstractCompactedRow row)
- {
- assert request.range.contains(row.key.token) : row.key.token + " is not contained in " + request.range;
- assert lastKey == null || lastKey.compareTo(row.key) < 0
- : "row " + row.key + " received out of order wrt " + lastKey;
- lastKey = row.key;
-
- if (range == null)
- range = ranges.next();
-
- // generate new ranges as long as case 1 is true
- while (!range.contains(row.key.token))
- {
- // add the empty hash, and move to the next range
- range.addHash(EMPTY_ROW);
- range = ranges.next();
- }
-
- // case 3 must be true: mix in the hashed row
- range.addHash(rowHash(row));
- }
-
- private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
- {
- validated++;
- // MerkleTree uses XOR internally, so we want lots of output bits here
- MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
- row.update(digest);
- return new MerkleTree.RowHash(row.key.token, digest.digest());
- }
-
- /**
- * Registers the newly created tree for rendezvous in Stage.ANTI_ENTROPY.
- */
- public void complete()
- {
- completeTree();
-
- StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
- logger.debug("Validated " + validated + " rows into AEService tree for " + request);
- }
-
- void completeTree()
- {
- assert ranges != null : "Validator was not prepared()";
-
- if (range != null)
- range.addHash(EMPTY_ROW);
- while (ranges.hasNext())
- {
- range = ranges.next();
- range.addHash(EMPTY_ROW);
- }
- }
-
- /**
- * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTI_ENTROPY.
- */
- public void run()
- {
- // respond to the request that triggered this validation
- ActiveRepairService.instance.respond(this, FBUtilities.getBroadcastAddress());
- }
-
- public MessageOut<Validator> createMessage()
- {
- return new MessageOut<Validator>(MessagingService.Verb.TREE_RESPONSE, this, Validator.serializer);
- }
-
- public static class ValidatorSerializer implements IVersionedSerializer<Validator>
- {
- public void serialize(Validator validator, DataOutput out, int version) throws IOException
- {
- TreeRequest.serializer.serialize(validator.request, out, version);
- MerkleTree.serializer.serialize(validator.tree, out, version);
- }
-
- public Validator deserialize(DataInput in, int version) throws IOException
- {
- final TreeRequest request = TreeRequest.serializer.deserialize(in, version);
- try
- {
- return new Validator(request, MerkleTree.serializer.deserialize(in, version));
- }
- catch(Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public long serializedSize(Validator validator, int version)
- {
- return TreeRequest.serializer.serializedSize(validator.request, version)
- + MerkleTree.serializer.serializedSize(validator.tree, version);
- }
- }
- }
-
- /**
- * Handler for requests from remote nodes to generate a valid tree.
- */
- public static class TreeRequestVerbHandler implements IVerbHandler<TreeRequest>
- {
- /**
- * Trigger a validation compaction which will return the tree upon completion.
- */
- public void doVerb(MessageIn<TreeRequest> message, int id)
- {
- TreeRequest remotereq = message.payload;
- TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.gcBefore, remotereq.cf);
-
- // trigger read-only compaction
- ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
- Validator validator = new Validator(request);
- logger.debug("Queueing validation compaction for " + request);
- CompactionManager.instance.submitValidation(store, validator);
- }
- }
-
- /**
- * Handler for responses from remote nodes which contain a valid tree.
- * The payload is a completed Validator object from the remote endpoint.
- */
- public static class TreeResponseVerbHandler implements IVerbHandler<Validator>
- {
- public void doVerb(MessageIn<Validator> message, int id)
- {
- // deserialize the remote tree, and register it
- Validator response = message.payload;
- TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.gcBefore, response.request.cf);
- ActiveRepairService.instance.rendezvous(request, response.tree);
- }
- }
-
- /**
- * A tuple of table and cf.
- */
- public static class CFPair extends Pair<String,String>
- {
- public CFPair(String table, String cf)
- {
- super(table, cf);
- assert table != null && cf != null;
- }
- }
-
- /**
- * A tuple of table, cf, address and range that represents a location we have an outstanding TreeRequest for.
- */
- public static class TreeRequest
- {
- public static final TreeRequestSerializer serializer = new TreeRequestSerializer();
-
- public final String sessionid;
- public final InetAddress endpoint;
- public final Range<Token> range;
- public final int gcBefore;
- public final CFPair cf;
-
- public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, int gcBefore, CFPair cf)
- {
- this.sessionid = sessionid;
- this.endpoint = endpoint;
- this.cf = cf;
- this.gcBefore = gcBefore;
- this.range = range;
- }
-
- @Override
- public final int hashCode()
- {
- return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range);
- }
-
- @Override
- public final boolean equals(Object o)
- {
- if(!(o instanceof TreeRequest))
- return false;
- TreeRequest that = (TreeRequest)o;
- // handles nulls properly
- return Objects.equal(sessionid, that.sessionid) && Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore && Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
- }
-
- @Override
- public String toString()
- {
- return "#<TreeRequest " + sessionid + ", " + endpoint + ", " + gcBefore + ", " + cf + ", " + range + ">";
- }
-
- public MessageOut<TreeRequest> createMessage()
- {
- return new MessageOut<TreeRequest>(MessagingService.Verb.TREE_REQUEST, this, TreeRequest.serializer);
- }
-
- public static class TreeRequestSerializer implements IVersionedSerializer<TreeRequest>
- {
- public void serialize(TreeRequest request, DataOutput out, int version) throws IOException
- {
- out.writeUTF(request.sessionid);
- CompactEndpointSerializationHelper.serialize(request.endpoint, out);
-
- if (version >= MessagingService.VERSION_20)
- out.writeInt(request.gcBefore);
- out.writeUTF(request.cf.left);
- out.writeUTF(request.cf.right);
- AbstractBounds.serializer.serialize(request.range, out, version);
- }
-
- public TreeRequest deserialize(DataInput in, int version) throws IOException
- {
- String sessId = in.readUTF();
- InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(in);
- int gcBefore = -1;
- if (version >= MessagingService.VERSION_20)
- gcBefore = in.readInt();
- CFPair cfpair = new CFPair(in.readUTF(), in.readUTF());
- Range<Token> range;
- range = (Range<Token>) AbstractBounds.serializer.deserialize(in, version);
-
- return new TreeRequest(sessId, endpoint, range, gcBefore, cfpair);
- }
-
- public long serializedSize(TreeRequest request, int version)
- {
- return TypeSizes.NATIVE.sizeof(request.sessionid)
- + CompactEndpointSerializationHelper.serializedSize(request.endpoint)
- + TypeSizes.NATIVE.sizeof(request.gcBefore)
- + TypeSizes.NATIVE.sizeof(request.cf.left)
- + TypeSizes.NATIVE.sizeof(request.cf.right)
- + AbstractBounds.serializer.serializedSize(request.range, version);
- }
- }
- }
-
- /**
- * Triggers repairs with all neighbors for the given table, cfs and range.
- */
- static class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
- {
- private final String sessionName;
- private final boolean isSequential;
- private final String tablename;
- private final String[] cfnames;
- private final Range<Token> range;
- private volatile Exception exception;
- private final AtomicBoolean isFailed = new AtomicBoolean(false);
-
- private final Set<InetAddress> endpoints;
- final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
- final Map<String, RepairJob> activeJobs = new ConcurrentHashMap<String, RepairJob>();
-
- private final SimpleCondition completed = new SimpleCondition();
- public final Condition differencingDone = new SimpleCondition();
-
- private volatile boolean terminated = false;
-
- public RepairSession(TreeRequest req, String tablename, String... cfnames)
- {
- this(req.sessionid, req.range, tablename, false, false, cfnames);
- ActiveRepairService.instance.sessions.put(getName(), this);
- }
-
- public RepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)
- {
- this(UUIDGen.getTimeUUID().toString(), range, tablename, isSequential, isLocal, cfnames);
- }
-
- private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String[] cfnames)
- {
- this.sessionName = id;
- this.isSequential = isSequential;
- this.tablename = tablename;
- this.cfnames = cfnames;
- assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
- this.range = range;
- this.endpoints = ActiveRepairService.getNeighbors(tablename, range, isLocal);
- }
-
- public String getName()
- {
- return sessionName;
- }
-
- public Range<Token> getRange()
- {
- return range;
- }
-
- RepairFuture getFuture()
- {
- return new RepairFuture(this);
- }
-
- private String repairedNodes()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(FBUtilities.getBroadcastAddress());
- for (InetAddress ep : endpoints)
- sb.append(", ").append(ep);
- return sb.toString();
- }
-
- // we don't care about the return value but care about it throwing exception
- public void runMayThrow() throws Exception
- {
- logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames)));
-
- if (endpoints.isEmpty())
- {
- differencingDone.signalAll();
- logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range));
- return;
- }
-
- // Checking all nodes are live
- for (InetAddress endpoint : endpoints)
- {
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
- differencingDone.signalAll();
- logger.error(String.format("[repair #%s] ", getName()) + message);
- throw new IOException(message);
- }
- }
-
- ActiveRepairService.instance.sessions.put(getName(), this);
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
- try
- {
- // Create and queue a RepairJob for each column family
- for (String cfname : cfnames)
- {
- RepairJob job = new RepairJob(cfname);
- jobs.offer(job);
- activeJobs.put(cfname, job);
- }
-
- jobs.peek().sendTreeRequests();
-
- // block whatever thread started this session until all requests have been returned:
- // if this thread dies, the session will still complete in the background
- completed.await();
- if (exception == null)
- {
- logger.info(String.format("[repair #%s] session completed successfully", getName()));
- }
- else
- {
- logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception);
- throw exception;
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException("Interrupted while waiting for repair.");
- }
- finally
- {
- // mark this session as terminated
- terminate();
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- Gossiper.instance.unregister(this);
- ActiveRepairService.instance.sessions.remove(getName());
- }
- }
-
- /**
- * @return whether this session is terminated
- */
- public boolean terminated()
- {
- return terminated;
- }
-
- public void terminate()
- {
- terminated = true;
- for (RepairJob job : jobs)
- job.terminate();
- jobs.clear();
- activeJobs.clear();
- }
-
- /**
- * terminate this session.
- */
- public void forceShutdown()
- {
- differencingDone.signalAll();
- completed.signalAll();
- }
-
- void completed(Differencer differencer)
- {
- logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s",
- getName(),
- differencer.r1.endpoint,
- differencer.r2.endpoint,
- differencer.cfname));
- RepairJob job = activeJobs.get(differencer.cfname);
- if (job == null)
- {
- assert terminated;
- return;
- }
-
- if (job.completedSynchronization(differencer))
- {
- activeJobs.remove(differencer.cfname);
- String remaining = activeJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", activeJobs.size());
- logger.info(String.format("[repair #%s] %s is fully synced%s", getName(), differencer.cfname, remaining));
- if (activeJobs.isEmpty())
- completed.signalAll();
- }
- }
-
- void failedNode(InetAddress remote)
- {
- String errorMsg = String.format("Endpoint %s died", remote);
- exception = new IOException(errorMsg);
- // If a node failed, we stop everything (though there could still be some activity in the background)
- forceShutdown();
- }
-
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void onRestart(InetAddress endpoint, EndpointState epState)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoints.contains(endpoint))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
- // Though unlikely, it is possible to arrive here multiple time and we
- // want to avoid print an error message twice
- if (!isFailed.compareAndSet(false, true))
- return;
-
- failedNode(endpoint);
- }
-
- class RepairJob
- {
- private final String cfname;
- // first we send tree requests. this tracks the endpoints remaining to hear from
- private final RequestCoordinator<TreeRequest> treeRequests;
- // tree responses are then tracked here
- private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size() + 1);
- // once all responses are received, each tree is compared with each other, and differencer tasks
- // are submitted. the job is done when all differencers are complete.
- private final RequestCoordinator<Differencer> differencers;
- private final Condition requestsSent = new SimpleCondition();
- private CountDownLatch snapshotLatch = null;
-
- public RepairJob(String cfname)
- {
- this.cfname = cfname;
- this.treeRequests = new RequestCoordinator<TreeRequest>(isSequential)
- {
- public void send(TreeRequest r)
- {
- MessagingService.instance().sendOneWay(r.createMessage(), r.endpoint);
- }
- };
- this.differencers = new RequestCoordinator<Differencer>(isSequential)
- {
- public void send(Differencer d)
- {
- StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
- }
- };
- }
-
- /**
- * Send merkle tree request to every involved neighbor.
- */
- public void sendTreeRequests()
- {
- // send requests to all nodes
- List<InetAddress> allEndpoints = new ArrayList<InetAddress>(endpoints);
- allEndpoints.add(FBUtilities.getBroadcastAddress());
-
- if (isSequential)
- makeSnapshots(endpoints);
-
- int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
-
- for (InetAddress endpoint : allEndpoints)
- treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));
-
- logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints));
- treeRequests.start();
- requestsSent.signalAll();
- }
-
- public void makeSnapshots(Collection<InetAddress> endpoints)
- {
- try
- {
- snapshotLatch = new CountDownLatch(endpoints.size());
- IAsyncCallback callback = new IAsyncCallback()
- {
- public boolean isLatencyForSnitch()
- {
- return false;
- }
-
- public void response(MessageIn msg)
- {
- RepairJob.this.snapshotLatch.countDown();
- }
- };
- for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false).createMessage(), endpoint, callback);
- snapshotLatch.await();
- snapshotLatch = null;
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Add a new received tree and return the number of remaining tree to
- * be received for the job to be complete.
- *
- * Callers may assume exactly one addTree call will result in zero remaining endpoints.
- */
- public synchronized int addTree(TreeRequest request, MerkleTree tree)
- {
- // Wait for all request to have been performed (see #3400)
- try
- {
- requestsSent.await();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError("Interrupted while waiting for requests to be sent");
- }
-
- assert request.cf.right.equals(cfname);
- trees.add(new TreeResponse(request.endpoint, tree));
- return treeRequests.completed(request);
- }
-
- /**
- * Submit differencers for running.
- * All tree *must* have been received before this is called.
- */
- public void submitDifferencers()
- {
- // We need to difference all trees one against another
- for (int i = 0; i < trees.size() - 1; ++i)
- {
- TreeResponse r1 = trees.get(i);
- for (int j = i + 1; j < trees.size(); ++j)
- {
- TreeResponse r2 = trees.get(j);
- Differencer differencer = new Differencer(cfname, r1, r2);
- logger.debug("Queueing comparison {}", differencer);
- differencers.add(differencer);
- }
- }
- differencers.start();
- trees.clear(); // allows gc to do its thing
- }
-
- /**
- * @return true if the differencer was the last remaining
- */
- synchronized boolean completedSynchronization(Differencer differencer)
- {
- return differencers.completed(differencer) == 0;
- }
-
- public void terminate()
- {
- if (snapshotLatch != null)
- {
- while (snapshotLatch.getCount() > 0)
- snapshotLatch.countDown();
- }
- }
- }
-
- /**
- * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
- */
- class Differencer implements Runnable
+ switch (message.messageType)
{
- public final String cfname;
- public final TreeResponse r1;
- public final TreeResponse r2;
- public final List<Range<Token>> differences = new ArrayList<Range<Token>>();
-
- Differencer(String cfname, TreeResponse r1, TreeResponse r2)
- {
- this.cfname = cfname;
- this.r1 = r1;
- this.r2 = r2;
- }
-
- /**
- * Compares our trees, and triggers repairs for any ranges that mismatch.
- */
- public void run()
- {
- // restore partitioners (in case we were serialized)
- if (r1.tree.partitioner() == null)
- r1.tree.partitioner(StorageService.getPartitioner());
- if (r2.tree.partitioner() == null)
- r2.tree.partitioner(StorageService.getPartitioner());
-
- // compare trees, and collect differences
- differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
-
- // choose a repair method based on the significance of the difference
- String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname);
- if (differences.isEmpty())
- {
- logger.info(String.format(format, "are consistent"));
- completed(this);
- return;
- }
-
- // non-0 difference: perform streaming repair
- logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
- performStreamingRepair();
- }
-
- /**
- * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
- * that will be called out of band once the streams complete.
- */
- void performStreamingRepair()
- {
- Runnable callback = new Runnable()
- {
- public void run()
- {
- completed(Differencer.this);
- }
- };
- StreamingRepairTask task = StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, differences, callback);
-
- task.run();
- }
-
- public String toString()
- {
- return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + "/" + range + ">";
- }
- }
- }
-
- static class TreeResponse
- {
- public final InetAddress endpoint;
- public final MerkleTree tree;
-
- TreeResponse(InetAddress endpoint, MerkleTree tree)
- {
- this.endpoint = endpoint;
- this.tree = tree;
- }
- }
-
- public static class RepairFuture extends FutureTask
- {
- public final RepairSession session;
-
- RepairFuture(RepairSession session)
- {
- super(session, null);
- this.session = session;
- }
- }
-
- public static abstract class RequestCoordinator<R>
- {
- private final Order<R> orderer;
-
- protected RequestCoordinator(boolean isSequential)
- {
- this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
- }
-
- public abstract void send(R request);
-
- public void add(R request)
- {
- orderer.add(request);
- }
-
- public void start()
- {
- orderer.start();
- }
-
- // Returns how many request remains
- public int completed(R request)
- {
- return orderer.completed(request);
- }
-
- private static abstract class Order<R>
- {
- protected final RequestCoordinator<R> coordinator;
-
- Order(RequestCoordinator<R> coordinator)
- {
- this.coordinator = coordinator;
- }
-
- public abstract void add(R request);
- public abstract void start();
- public abstract int completed(R request);
- }
-
- private static class SequentialOrder<R> extends Order<R>
- {
- private final Queue<R> requests = new LinkedList<R>();
-
- SequentialOrder(RequestCoordinator<R> coordinator)
- {
- super(coordinator);
- }
-
- public void add(R request)
- {
- requests.add(request);
- }
-
- public void start()
- {
- if (requests.isEmpty())
- return;
-
- coordinator.send(requests.peek());
- }
-
- public int completed(R request)
- {
- assert request.equals(requests.peek());
- requests.poll();
- int remaining = requests.size();
- if (remaining != 0)
- coordinator.send(requests.peek());
- return remaining;
- }
- }
-
- private static class ParallelOrder<R> extends Order<R>
- {
- private final Set<R> requests = new HashSet<R>();
-
- ParallelOrder(RequestCoordinator<R> coordinator)
- {
- super(coordinator);
- }
-
- public void add(R request)
- {
- requests.add(request);
- }
-
- public void start()
- {
- for (R request : requests)
- coordinator.send(request);
- }
-
- public int completed(R request)
- {
- requests.remove(request);
- return requests.size();
- }
+ case VALIDATION_COMPLETE:
+ ValidationComplete validation = (ValidationComplete) message;
+ session.validationComplete(desc, endpoint, validation.tree);
+ break;
+ case SYNC_COMPLETE:
+ // one of replica is synced.
+ SyncComplete sync = (SyncComplete) message;
+ session.syncComplete(desc, sync.nodes, sync.success);
+ break;
+ default:
+ break;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index accc2c5..a67889a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -41,8 +41,8 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +72,8 @@ import org.apache.cassandra.net.AsyncOneResponse;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseVerbHandler;
-import org.apache.cassandra.service.ActiveRepairService.TreeRequestVerbHandler;
+import org.apache.cassandra.repair.RepairFuture;
+import org.apache.cassandra.repair.RepairMessageVerbHandler;
import org.apache.cassandra.service.paxos.CommitVerbHandler;
import org.apache.cassandra.service.paxos.PrepareVerbHandler;
import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -238,10 +239,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TREE_REQUEST, new TreeRequestVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TREE_RESPONSE, new ActiveRepairService.TreeResponseVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAMING_REPAIR_REQUEST, new StreamingRepairTask.StreamingRepairRequest());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, new StreamingRepairTask.StreamingRepairResponse());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPAIR_MESSAGE, new RepairMessageVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_SHUTDOWN, new GossipShutdownVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
@@ -2348,10 +2346,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
- List<ActiveRepairService.RepairFuture> futures = new ArrayList<ActiveRepairService.RepairFuture>(ranges.size());
+ List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
for (Range<Token> range : ranges)
{
- ActiveRepairService.RepairFuture future;
+ RepairFuture future;
try
{
future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies);
@@ -2377,23 +2375,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
}
}
- for (ActiveRepairService.RepairFuture future : futures)
+ for (RepairFuture future : futures)
{
try
{
future.get();
- message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
+ message = String.format("Repair session %s for range %s finished", future.session.getId(), future.session.getRange().toString());
+ logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
}
catch (ExecutionException e)
{
- message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
+ message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getCause().getMessage());
logger.error(message, e);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
}
catch (Exception e)
{
- message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
+ message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getMessage());
logger.error(message, e);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
}
@@ -2403,7 +2402,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}, null);
}
- public ActiveRepairService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
deleted file mode 100644
index 3730b0e..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * Task that make two nodes exchange (stream) some ranges (for a given table/cf).
- * This handle the case where the local node is neither of the two nodes that
- * must stream their range, and allow to register a callback to be called on
- * completion.
- */
-public class StreamingRepairTask implements Runnable
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
-
- // maps of tasks created on this node
- private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>();
- public static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
-
- public final UUID id;
- private final InetAddress owner; // the node where the task is created; can be == src but don't need to
- public final InetAddress src;
- public final InetAddress dst;
-
- private final String tableName;
- private final String cfName;
- private final Collection<Range<Token>> ranges;
- private final StreamEventHandler callback;
-
- private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, StreamEventHandler callback)
- {
- this.id = id;
- this.owner = owner;
- this.src = src;
- this.dst = dst;
- this.tableName = tableName;
- this.cfName = cfName;
- this.ranges = ranges;
- this.callback = callback;
- }
-
- public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback)
- {
- InetAddress local = FBUtilities.getBroadcastAddress();
- UUID id = UUIDGen.getTimeUUID();
- // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
- InetAddress src = ep2.equals(local) ? ep2 : ep1;
- InetAddress dst = ep2.equals(local) ? ep1 : ep2;
- StreamingRepairTask task = new StreamingRepairTask(id, local, src, dst, tableName, cfName, ranges, wrapCallback(callback, id, local.equals(src)));
- tasks.put(id, task);
- return task;
- }
-
- /**
- * Returns true if the task if the task can be executed locally, false if
- * it has to be forwarded.
- */
- public boolean isLocalTask()
- {
- return owner.equals(src);
- }
-
- public void run()
- {
- if (src.equals(FBUtilities.getBroadcastAddress()))
- {
- initiateStreaming();
- }
- else
- {
- forwardToSource();
- }
- }
-
- private void initiateStreaming()
- {
- logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
- StreamResultFuture op = new StreamPlan("Repair")
- .flushBeforeTransfer(true)
- // request ranges from the remote node
- .requestRanges(dst, tableName, ranges, cfName)
- // send ranges to the remote node
- .transferRanges(dst, tableName, ranges, cfName)
- .execute();
- op.addEventListener(callback);
- }
-
- private void forwardToSource()
- {
- logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", id, ranges.size(), src, dst));
- MessageOut<StreamingRepairTask> msg = new MessageOut<StreamingRepairTask>(MessagingService.Verb.STREAMING_REPAIR_REQUEST,
- this,
- StreamingRepairTask.serializer);
- MessagingService.instance().sendOneWay(msg, src);
- }
-
- private static StreamEventHandler makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
- {
- return new StreamEventHandler()
- {
- public void onSuccess(StreamState finalState)
- {
- StreamingRepairResponse.reply(taskOwner, taskId);
- }
-
- public void onFailure(Throwable t) {}
- public void handleStreamEvent(StreamEvent event) {}
- };
- }
-
- // wrap a given callback so as to unregister the streaming repair task on completion
- private static StreamEventHandler wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
- {
- return new StreamEventHandler()
- {
- public void onSuccess(StreamState finalState)
- {
- tasks.remove(taskid);
- if (callback != null)
- callback.run();
- }
-
- public void onFailure(Throwable t) {}
- public void handleStreamEvent(StreamEvent event) {}
- };
- }
-
- public static class StreamingRepairRequest implements IVerbHandler<StreamingRepairTask>
- {
- public void doVerb(MessageIn<StreamingRepairTask> message, int id)
- {
- StreamingRepairTask task = message.payload;
- assert task.src.equals(FBUtilities.getBroadcastAddress());
- assert task.owner.equals(message.from);
-
- logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.from, task.ranges.size(), task.dst));
-
- task.run();
- }
-
- }
-
- public static class StreamingRepairResponse implements IVerbHandler<UUID>
- {
- public void doVerb(MessageIn<UUID> message, int id)
- {
- UUID taskid = message.payload;
- StreamingRepairTask task = tasks.get(taskid);
- if (task == null)
- {
- logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.from, taskid));
- return;
- }
-
- assert task.owner.equals(FBUtilities.getBroadcastAddress());
-
- logger.info(String.format("[streaming task #%s] task succeeded", task.id));
- if (task.callback != null)
- {
- // TODO null
- task.callback.onSuccess(null);
- }
- }
-
- private static void reply(InetAddress remote, UUID taskid)
- {
- logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
- MessageOut<UUID> message = new MessageOut<UUID>(MessagingService.Verb.STREAMING_REPAIR_RESPONSE, taskid, UUIDSerializer.serializer);
- MessagingService.instance().sendOneWay(message, remote);
- }
- }
-
- private static class StreamingRepairTaskSerializer implements IVersionedSerializer<StreamingRepairTask>
- {
- public void serialize(StreamingRepairTask task, DataOutput out, int version) throws IOException
- {
- UUIDSerializer.serializer.serialize(task.id, out, version);
- CompactEndpointSerializationHelper.serialize(task.owner, out);
- CompactEndpointSerializationHelper.serialize(task.src, out);
- CompactEndpointSerializationHelper.serialize(task.dst, out);
- out.writeUTF(task.tableName);
- out.writeUTF(task.cfName);
- out.writeInt(task.ranges.size());
- for (Range<Token> range : task.ranges)
- AbstractBounds.serializer.serialize(range, out, version);
- // We don't serialize the callback on purpose
- }
-
- public StreamingRepairTask deserialize(DataInput in, int version) throws IOException
- {
- UUID id = UUIDSerializer.serializer.deserialize(in, version);
- InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
- String tableName = in.readUTF();
- String cfName = in.readUTF();
- int rangesCount = in.readInt();
- List<Range<Token>> ranges = new ArrayList<Range<Token>>(rangesCount);
- for (int i = 0; i < rangesCount; ++i)
- ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
- return new StreamingRepairTask(id, owner, src, dst, tableName, cfName, ranges, makeReplyingCallback(owner, id));
- }
-
- public long serializedSize(StreamingRepairTask task, int version)
- {
- long size = UUIDSerializer.serializer.serializedSize(task.id, version);
- size += 3 * CompactEndpointSerializationHelper.serializedSize(task.owner);
- size += TypeSizes.NATIVE.sizeof(task.tableName);
- size += TypeSizes.NATIVE.sizeof(task.cfName);
- size += TypeSizes.NATIVE.sizeof(task.ranges.size());
- for (Range<Token> range : task.ranges)
- size += AbstractBounds.serializer.serializedSize(range, version);
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 8a5572e..9288759 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -213,6 +213,11 @@ public class FBUtilities
return FastByteComparisons.compareTo(bytes1, offset1, len1, bytes2, offset2, len2);
}
+ public static int compareUnsigned(byte[] bytes1, byte[] bytes2)
+ {
+ return compareUnsigned(bytes1, bytes2, 0, 0, bytes1.length, bytes2.length);
+ }
+
/**
* @return The bitwise XOR of the inputs. The output will be the same length as the
* longer input, but if either input is null, the output will be null.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 5284a57..a16c2b9 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.PeekingIterator;
@@ -30,6 +31,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.IVersionedSerializer;
/**
@@ -65,18 +67,9 @@ public class MerkleTree implements Serializable
public final byte hashdepth;
- /**
- * The top level range that this MerkleTree covers.
- * In a perfect world, this should be final and *not* transient. However
- * this would break serialization with version >e; 0.7 because it uses
- * java serialization. We are moreover always shipping the fullRange will
- * the request so we can add it back post-deserialization (as for the
- * partitioner).
- */
- public transient Range<Token> fullRange;
-
- // TODO This is broken; Token serialization assumes system partitioner, so if this doesn't match all hell breaks loose
- private transient IPartitioner partitioner;
+ /** The top level range that this MerkleTree covers. */
+ public final Range<Token> fullRange;
+ private final IPartitioner partitioner;
private long maxsize;
private long size;
@@ -89,6 +82,10 @@ public class MerkleTree implements Serializable
out.writeByte(mt.hashdepth);
out.writeLong(mt.maxsize);
out.writeLong(mt.size);
+ out.writeUTF(mt.partitioner.getClass().getCanonicalName());
+ // full range
+ Token.serializer.serialize(mt.fullRange.left, out);
+ Token.serializer.serialize(mt.fullRange.right, out);
Hashable.serializer.serialize(mt.root, out, version);
}
@@ -97,7 +94,22 @@ public class MerkleTree implements Serializable
byte hashdepth = in.readByte();
long maxsize = in.readLong();
long size = in.readLong();
- MerkleTree mt = new MerkleTree(null, null, hashdepth, maxsize);
+ IPartitioner partitioner;
+ try
+ {
+ partitioner = FBUtilities.newPartitioner(in.readUTF());
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+
+ // full range
+ Token left = Token.serializer.deserialize(in);
+ Token right = Token.serializer.deserialize(in);
+ Range<Token> fullRange = new Range<>(left, right, partitioner);
+
+ MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
mt.size = size;
mt.root = Hashable.serializer.deserialize(in, version);
return mt;
@@ -105,10 +117,17 @@ public class MerkleTree implements Serializable
public long serializedSize(MerkleTree mt, int version)
{
- return 1 // mt.hashdepth
+ long size = 1 // mt.hashdepth
+ TypeSizes.NATIVE.sizeof(mt.maxsize)
+ TypeSizes.NATIVE.sizeof(mt.size)
- + Hashable.serializer.serializedSize(mt.root, version);
+ + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName());
+
+ // full range
+ size += Token.serializer.serializedSize(mt.fullRange.left, TypeSizes.NATIVE);
+ size += Token.serializer.serializedSize(mt.fullRange.right, TypeSizes.NATIVE);
+
+ size += Hashable.serializer.serializedSize(mt.root, version);
+ return size;
}
}
@@ -122,8 +141,8 @@ public class MerkleTree implements Serializable
public MerkleTree(IPartitioner partitioner, Range<Token> range, byte hashdepth, long maxsize)
{
assert hashdepth < Byte.MAX_VALUE;
- this.fullRange = range;
- this.partitioner = partitioner;
+ this.fullRange = Preconditions.checkNotNull(range);
+ this.partitioner = Preconditions.checkNotNull(partitioner);
this.hashdepth = hashdepth;
this.maxsize = maxsize;
@@ -199,14 +218,6 @@ public class MerkleTree implements Serializable
}
/**
- * TODO: Find another way to use the local partitioner after serialization.
- */
- public void partitioner(IPartitioner partitioner)
- {
- this.partitioner = partitioner;
- }
-
- /**
* @param ltree First tree.
* @param rtree Second tree.
* @return A list of the largest contiguous ranges where the given trees disagree.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.SyncComplete.bin b/test/data/serialization/2.0/service.SyncComplete.bin
new file mode 100644
index 0000000..66c72e1
Binary files /dev/null and b/test/data/serialization/2.0/service.SyncComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.SyncRequest.bin b/test/data/serialization/2.0/service.SyncRequest.bin
new file mode 100644
index 0000000..8918405
Binary files /dev/null and b/test/data/serialization/2.0/service.SyncRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.TreeRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeRequest.bin b/test/data/serialization/2.0/service.TreeRequest.bin
deleted file mode 100644
index b336e50..0000000
Binary files a/test/data/serialization/2.0/service.TreeRequest.bin and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.TreeResponse.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.TreeResponse.bin b/test/data/serialization/2.0/service.TreeResponse.bin
deleted file mode 100644
index b63d8a2..0000000
Binary files a/test/data/serialization/2.0/service.TreeResponse.bin and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.ValidationComplete.bin b/test/data/serialization/2.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..bc633bc
Binary files /dev/null and b/test/data/serialization/2.0/service.ValidationComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/data/serialization/2.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/service.ValidationRequest.bin b/test/data/serialization/2.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..4ec4c47
Binary files /dev/null and b/test/data/serialization/2.0/service.ValidationRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 3fb828b..9218dc9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.UUID;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,12 +41,12 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
@@ -87,11 +88,10 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
assert strategy.getLevelSize(1) > 0;
assert strategy.getLevelSize(2) > 0;
- ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname);
Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
int gcBefore = table.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
- ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p);
- ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req);
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), ksname, cfname, range);
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
new file mode 100644
index 0000000..2502620
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.utils.MerkleTree;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DifferencerTest extends SchemaLoader
+{
+ private static final IPartitioner partirioner = new Murmur3Partitioner();
+
+ @After
+ public void tearDown()
+ {
+ SinkManager.clear();
+ }
+
+ /**
+ * When there is no difference between two, Differencer should respond SYNC_COMPLETE
+ */
+ @Test
+ public void testNoDifference() throws Throwable
+ {
+ final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
+ final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
+
+ SinkManager.add(new IMessageSink()
+ {
+ @SuppressWarnings("unchecked")
+ public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+ {
+ if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+ {
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.SYNC_COMPLETE, m.messageType);
+ // we should see SYNC_COMPLETE
+ assertEquals(new NodePair(ep1, ep2), ((SyncComplete)m).nodes);
+ }
+ return null;
+ }
+
+ public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+ {
+ return null;
+ }
+ });
+ Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+
+ MerkleTree tree1 = createInitialTree(desc);
+ MerkleTree tree2 = createInitialTree(desc);
+
+ // difference the trees
+ // note: we reuse the same endpoint which is bogus in theory but fine here
+ TreeResponse r1 = new TreeResponse(ep1, tree1);
+ TreeResponse r2 = new TreeResponse(ep2, tree2);
+ Differencer diff = new Differencer(desc, r1, r2);
+ diff.run();
+
+ assertTrue(diff.differences.isEmpty());
+ }
+
+ @Test
+ public void testDifference() throws Throwable
+ {
+ Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
+ RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+
+ MerkleTree tree1 = createInitialTree(desc);
+ MerkleTree tree2 = createInitialTree(desc);
+
+ // change a range in one of the trees
+ Token token = partirioner.midpoint(range.left, range.right);
+ tree1.invalidate(token);
+ MerkleTree.TreeRange changed = tree1.get(token);
+ changed.hash("non-empty hash!".getBytes());
+
+ Set<Range<Token>> interesting = new HashSet<>();
+ interesting.add(changed);
+
+ // difference the trees
+ // note: we reuse the same endpoint which is bogus in theory but fine here
+ TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
+ TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
+ Differencer diff = new Differencer(desc, r1, r2);
+ diff.run();
+
+ // ensure that the changed range was recorded
+ assertEquals("Wrong differing ranges", interesting, new HashSet<>(diff.differences));
+ }
+
+ private MerkleTree createInitialTree(RepairJobDesc desc)
+ {
+ MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
+ tree.init();
+ for (MerkleTree.TreeRange r : tree.invalids())
+ {
+ r.addHash(Validator.EMPTY_ROW);
+ }
+ return tree;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
new file mode 100644
index 0000000..12abd24
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TreeMapBackedSortedColumns;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.SimpleCondition;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ValidatorTest extends SchemaLoader
+{
+ private final String keyspace = "Keyspace1";
+ private final String columnFamily = "Standard1";
+ private final IPartitioner partitioner = StorageService.getPartitioner();
+
+ @After
+ public void tearDown()
+ {
+ SinkManager.clear();
+ }
+
+ @Test
+ public void testValidatorComplete() throws Throwable
+ {
+ Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+
+ final SimpleCondition lock = new SimpleCondition();
+ SinkManager.add(new IMessageSink()
+ {
+ @SuppressWarnings("unchecked")
+ public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+ {
+ try
+ {
+ if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+ {
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertTrue(((ValidationComplete)m).success);
+ assertNotNull(((ValidationComplete)m).tree);
+ }
+ }
+ finally
+ {
+ lock.signalAll();
+ }
+ return null;
+ }
+
+ public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+ {
+ return null;
+ }
+ });
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.prepare(cfs);
+
+ // and confirm that the tree was split
+ assertTrue(validator.tree.size() > 1);
+
+ // add a row
+ Token mid = partitioner.midpoint(range.left, range.right);
+ validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
+ TreeMapBackedSortedColumns.factory.create(cfs.metadata)));
+ validator.complete();
+
+ // confirm that the tree was validated
+ Token min = validator.tree.partitioner().getMinimumToken();
+ assertNotNull(validator.tree.hash(new Range<>(min, min)));
+
+ if (!lock.isSignaled())
+ lock.await();
+ }
+
+ @Test
+ public void testValidatorFailed() throws Throwable
+ {
+ Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
+ final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+
+ final SimpleCondition lock = new SimpleCondition();
+ SinkManager.add(new IMessageSink()
+ {
+ @SuppressWarnings("unchecked")
+ public MessageOut handleMessage(MessageOut message, int id, InetAddress to)
+ {
+ try
+ {
+ if (message.verb == MessagingService.Verb.REPAIR_MESSAGE)
+ {
+ RepairMessage m = (RepairMessage) message.payload;
+ assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(desc, m.desc);
+ assertFalse(((ValidationComplete) m).success);
+ assertNull(((ValidationComplete)m).tree);
+ }
+ }
+ finally
+ {
+ lock.signalAll();
+ }
+ return null;
+ }
+
+ public MessageIn handleMessage(MessageIn message, int id, InetAddress to)
+ {
+ return null;
+ }
+ });
+
+ InetAddress remote = InetAddress.getByName("127.0.0.2");
+
+ Validator validator = new Validator(desc, remote, 0);
+ validator.fail();
+
+ if (!lock.isSignaled())
+ lock.await();
+ }
+}