You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 18:59:25 UTC
[3/3] git commit: Redesign repair messages
Redesign repair messages
patch by yukim; reviewed by slebresne for CASSANDRA-5426
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb4fa4a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb4fa4a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb4fa4a6
Branch: refs/heads/trunk
Commit: eb4fa4a621db43ad9d48b146ee16caf09db7a853
Parents: 7646203
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 29 15:46:50 2013 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 25 18:48:59 2013 +0200
----------------------------------------------------------------------
.../db/compaction/CompactionManager.java | 34 +-
.../cassandra/exceptions/RepairException.java | 46 +
.../apache/cassandra/net/MessagingService.java | 17 +-
.../apache/cassandra/repair/Differencer.java | 138 +++
.../org/apache/cassandra/repair/NodePair.java | 85 ++
.../apache/cassandra/repair/RepairFuture.java | 31 +
.../org/apache/cassandra/repair/RepairJob.java | 225 ++++
.../apache/cassandra/repair/RepairJobDesc.java | 121 +++
.../repair/RepairMessageVerbHandler.java | 63 ++
.../apache/cassandra/repair/RepairSession.java | 320 ++++++
.../cassandra/repair/RequestCoordinator.java | 128 +++
.../cassandra/repair/StreamingRepairTask.java | 117 ++
.../apache/cassandra/repair/TreeResponse.java | 37 +
.../org/apache/cassandra/repair/Validator.java | 215 ++++
.../repair/messages/RepairMessage.java | 103 ++
.../cassandra/repair/messages/SyncComplete.java | 80 ++
.../cassandra/repair/messages/SyncRequest.java | 97 ++
.../repair/messages/ValidationComplete.java | 90 ++
.../repair/messages/ValidationRequest.java | 82 ++
.../cassandra/service/ActiveRepairService.java | 1016 +-----------------
.../cassandra/service/StorageService.java | 25 +-
.../streaming/StreamingRepairTask.java | 254 -----
.../org/apache/cassandra/utils/FBUtilities.java | 5 +
.../org/apache/cassandra/utils/MerkleTree.java | 61 +-
.../serialization/2.0/service.SyncComplete.bin | Bin 0 -> 290 bytes
.../serialization/2.0/service.SyncRequest.bin | Bin 0 -> 189 bytes
.../serialization/2.0/service.TreeRequest.bin | Bin 129 -> 0 bytes
.../serialization/2.0/service.TreeResponse.bin | Bin 946 -> 0 bytes
.../2.0/service.ValidationComplete.bin | Bin 0 -> 1063 bytes
.../2.0/service.ValidationRequest.bin | Bin 0 -> 131 bytes
.../LeveledCompactionStrategyTest.java | 10 +-
.../cassandra/repair/DifferencerTest.java | 139 +++
.../apache/cassandra/repair/ValidatorTest.java | 169 +++
.../service/AntiEntropyServiceTestAbstract.java | 103 +-
.../cassandra/service/SerializationsTest.java | 198 +++-
.../apache/cassandra/utils/MerkleTreeTest.java | 136 ++-
36 files changed, 2647 insertions(+), 1498 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6c9f50d..06dd95d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -47,8 +47,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
@@ -384,13 +384,22 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Does not mutate data, so is not scheduled.
*/
- public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final ActiveRepairService.Validator validator)
+ public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final Validator validator)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
- doValidationCompaction(cfStore, validator);
+ try
+ {
+ doValidationCompaction(cfStore, validator);
+ }
+ catch (Exception e)
+ {
+ // we need to inform the remote end of our failure, otherwise it will hang on repair forever
+ validator.fail();
+ throw e;
+ }
return this;
}
};
@@ -607,7 +616,7 @@ public class CompactionManager implements CompactionManagerMBean
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
*/
- private void doValidationCompaction(ColumnFamilyStore cfs, ActiveRepairService.Validator validator) throws IOException
+ private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) throws IOException
{
// this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
// mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work,
@@ -618,17 +627,18 @@ public class CompactionManager implements CompactionManagerMBean
return;
Collection<SSTableReader> sstables;
+ String snapshotName = validator.desc.sessionId.toString();
int gcBefore;
- if (cfs.snapshotExists(validator.request.sessionid))
+ if (cfs.snapshotExists(snapshotName))
{
// If there is a snapshot created for the session then read from there.
- sstables = cfs.getSnapshotSSTableReader(validator.request.sessionid);
+ sstables = cfs.getSnapshotSSTableReader(snapshotName);
// Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
// this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
// time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
// 'as good as in the non-snapshot' case)
- gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
}
else
{
@@ -638,13 +648,13 @@ public class CompactionManager implements CompactionManagerMBean
// we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
// instead so they won't be cleaned up if they do get compacted during the validation
sstables = cfs.markCurrentSSTablesReferenced();
- if (validator.request.gcBefore > 0)
- gcBefore = validator.request.gcBefore;
+ if (validator.gcBefore > 0)
+ gcBefore = validator.gcBefore;
else
gcBefore = getDefaultGcBefore(cfs);
}
- CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range, gcBefore);
+ CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
metrics.beginCompaction(ci);
try
@@ -664,8 +674,8 @@ public class CompactionManager implements CompactionManagerMBean
{
SSTableReader.releaseReferences(sstables);
iter.close();
- if (cfs.table.snapshotExists(validator.request.sessionid))
- cfs.table.clearSnapshot(validator.request.sessionid);
+ if (cfs.table.snapshotExists(snapshotName))
+ cfs.table.clearSnapshot(snapshotName);
metrics.finishCompaction(ci);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/exceptions/RepairException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RepairException.java b/src/java/org/apache/cassandra/exceptions/RepairException.java
new file mode 100644
index 0000000..832a6d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/RepairException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * Exception thrown during repair
+ */
+public class RepairException extends Exception
+{
+ public final RepairJobDesc desc;
+
+ public RepairException(RepairJobDesc desc, String message)
+ {
+ super(message);
+ this.desc = desc;
+ }
+
+ public RepairException(RepairJobDesc desc, String message, Throwable cause)
+ {
+ super(message, cause);
+ this.desc = desc;
+ }
+
+ @Override
+ public String getMessage()
+ {
+ return desc.toString() + " " + super.getMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4ac408e..faed07a 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.service.paxos.Commit;
@@ -91,8 +92,8 @@ public final class MessagingService implements MessagingServiceMBean
@Deprecated STREAM_REQUEST,
RANGE_SLICE,
@Deprecated BOOTSTRAP_TOKEN,
- TREE_REQUEST,
- TREE_RESPONSE,
+ @Deprecated TREE_REQUEST,
+ @Deprecated TREE_RESPONSE,
@Deprecated JOIN,
GOSSIP_DIGEST_SYN,
GOSSIP_DIGEST_ACK,
@@ -105,13 +106,14 @@ public final class MessagingService implements MessagingServiceMBean
REPLICATION_FINISHED,
INTERNAL_RESPONSE, // responses to internal calls
COUNTER_MUTATION,
- STREAMING_REPAIR_REQUEST,
- STREAMING_REPAIR_RESPONSE,
+ @Deprecated STREAMING_REPAIR_REQUEST,
+ @Deprecated STREAMING_REPAIR_RESPONSE,
SNAPSHOT, // Similar to nt snapshot
MIGRATION_REQUEST,
GOSSIP_SHUTDOWN,
_TRACE, // dummy verb so we can use MS.droppedMessages
ECHO,
+ REPAIR_MESSAGE,
// use as padding for backwards compatability where a previous version needs to validate a verb from the future.
PAXOS_PREPARE,
PAXOS_PROPOSE,
@@ -151,7 +153,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
-
+ put(Verb.REPAIR_MESSAGE, Stage.ANTI_ENTROPY);
put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
@@ -192,10 +194,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
- put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
- put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
- put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
- put(Verb.STREAMING_REPAIR_RESPONSE, UUIDSerializer.serializer);
+ put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer);
put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/Differencer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Differencer.java b/src/java/org/apache/cassandra/repair/Differencer.java
new file mode 100644
index 0000000..82b331b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/Differencer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
+ */
+public class Differencer implements Runnable
+{
+ private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+ private final RepairJobDesc desc;
+ public final TreeResponse r1;
+ public final TreeResponse r2;
+ public final List<Range<Token>> differences = new ArrayList<>();
+
+ public Differencer(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+ {
+ this.desc = desc;
+ this.r1 = r1;
+ this.r2 = r2;
+ }
+
+ /**
+ * Compares our trees, and triggers repairs for any ranges that mismatch.
+ */
+ public void run()
+ {
+ // compare trees, and collect differences
+ differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+
+ // choose a repair method based on the significance of the difference
+ String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+ if (differences.isEmpty())
+ {
+ logger.info(String.format(format, "are consistent"));
+ // send back sync complete message
+ MessagingService.instance().sendOneWay(new SyncComplete(desc, r1.endpoint, r2.endpoint, true).createMessage(), FBUtilities.getLocalAddress());
+ return;
+ }
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
+ performStreamingRepair();
+ }
+
+ /**
+ * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+ * that will be called out of band once the streams complete.
+ */
+ void performStreamingRepair()
+ {
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
+ InetAddress src = r2.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
+ InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
+
+ SyncRequest request = new SyncRequest(desc, local, src, dst, differences);
+ StreamingRepairTask task = new StreamingRepairTask(desc, request);
+ task.run();
+ }
+
+
+ /**
+ * In order to remove completed Differencer, equality is computed only from {@code desc} and
+ * endpoint part of two TreeResponses.
+ */
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Differencer that = (Differencer) o;
+ if (!desc.equals(that.desc)) return false;
+ return minEndpoint().equals(that.minEndpoint()) && maxEndpoint().equals(that.maxEndpoint());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(desc, minEndpoint(), maxEndpoint());
+ }
+
+ // For equals and hashcode, we don't want to take the endpoint order into account.
+ // So we just order endpoint deterministically to simplify this
+ private InetAddress minEndpoint()
+ {
+ return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0
+ ? r1.endpoint
+ : r2.endpoint;
+ }
+
+ private InetAddress maxEndpoint()
+ {
+ return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0
+ ? r2.endpoint
+ : r1.endpoint;
+ }
+
+ public String toString()
+ {
+ return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + ":" + desc.columnFamily + "@" + desc.range + ">";
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
new file mode 100644
index 0000000..f510dc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/NodePair.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+
+/**
+ * NodePair is used for repair message body to indicate the pair of nodes.
+ *
+ * @since 2.0
+ */
+public class NodePair
+{
+ public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
+
+ public final InetAddress endpoint1;
+ public final InetAddress endpoint2;
+
+ public NodePair(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ this.endpoint1 = endpoint1;
+ this.endpoint2 = endpoint2;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NodePair nodePair = (NodePair) o;
+ return endpoint1.equals(nodePair.endpoint1) && endpoint2.equals(nodePair.endpoint2);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(endpoint1, endpoint2);
+ }
+
+ public static class NodePairSerializer implements IVersionedSerializer<NodePair>
+ {
+ public void serialize(NodePair nodePair, DataOutput out, int version) throws IOException
+ {
+ CompactEndpointSerializationHelper.serialize(nodePair.endpoint1, out);
+ CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out);
+ }
+
+ public NodePair deserialize(DataInput in, int version) throws IOException
+ {
+ InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in);
+ return new NodePair(ep1, ep2);
+ }
+
+ public long serializedSize(NodePair nodePair, int version)
+ {
+ return 2 * CompactEndpointSerializationHelper.serializedSize(nodePair.endpoint1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairFuture.java b/src/java/org/apache/cassandra/repair/RepairFuture.java
new file mode 100644
index 0000000..127d873
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairFuture.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.concurrent.FutureTask;
+
+public class RepairFuture extends FutureTask<Void>
+{
+ public final RepairSession session;
+
+ public RepairFuture(RepairSession session)
+ {
+ super(session, null);
+ this.session = session;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
new file mode 100644
index 0000000..be3744d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.SimpleCondition;
+
+/**
+ * RepairJob runs repair on given ColumnFamily.
+ */
+public class RepairJob
+{
+ private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+ public final RepairJobDesc desc;
+ private final boolean isSequential;
+ // first we send tree requests. this tracks the endpoints remaining to hear from
+ private final RequestCoordinator<InetAddress> treeRequests;
+ // tree responses are then tracked here
+ private final List<TreeResponse> trees = new ArrayList<>();
+ // once all responses are received, each tree is compared with each other, and differencer tasks
+ // are submitted. the job is done when all differencers are complete.
+ private final RequestCoordinator<Differencer> differencers;
+ private final Condition requestsSent = new SimpleCondition();
+ private CountDownLatch snapshotLatch = null;
+ private int gcBefore = -1;
+
+ private volatile boolean failed = false;
+
+ /**
+ * Create repair job to run on specific columnfamily
+ */
+ public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+ {
+ this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ this.isSequential = isSequential;
+ this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+ {
+ public void send(InetAddress endpoint)
+ {
+ ValidationRequest request = new ValidationRequest(desc, gcBefore);
+ MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
+ }
+ };
+ this.differencers = new RequestCoordinator<Differencer>(isSequential)
+ {
+ public void send(Differencer d)
+ {
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+ }
+ };
+ }
+
+ /**
+ * @return true if this job failed
+ */
+ public boolean isFailed()
+ {
+ return failed;
+ }
+
+ /**
+ * Send merkle tree request to every involved neighbor.
+ */
+ public void sendTreeRequests(Collection<InetAddress> endpoints)
+ {
+ // send requests to all nodes
+ List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
+ allEndpoints.add(FBUtilities.getBroadcastAddress());
+
+ if (isSequential)
+ makeSnapshots(endpoints);
+
+ this.gcBefore = Table.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+
+ for (InetAddress endpoint : allEndpoints)
+ treeRequests.add(endpoint);
+
+ logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
+ treeRequests.start();
+ requestsSent.signalAll();
+ }
+
+ public void makeSnapshots(Collection<InetAddress> endpoints)
+ {
+ try
+ {
+ snapshotLatch = new CountDownLatch(endpoints.size());
+ IAsyncCallback callback = new IAsyncCallback()
+ {
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ public void response(MessageIn msg)
+ {
+ RepairJob.this.snapshotLatch.countDown();
+ }
+ };
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
+ snapshotLatch.await();
+ snapshotLatch = null;
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Add a new received tree and return the number of remaining tree to
+ * be received for the job to be complete.
+ *
+ * Callers may assume exactly one addTree call will result in zero remaining endpoints.
+ *
+ * @param endpoint address of the endpoint that sent response
+ * @param tree sent Merkle tree or null if validation failed on endpoint
+ * @return the number of responses waiting to receive
+ */
+ public synchronized int addTree(InetAddress endpoint, MerkleTree tree)
+ {
+ // Wait for all request to have been performed (see #3400)
+ try
+ {
+ requestsSent.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError("Interrupted while waiting for requests to be sent");
+ }
+
+ if (tree == null)
+ failed = true;
+ else
+ trees.add(new TreeResponse(endpoint, tree));
+ return treeRequests.completed(endpoint);
+ }
+
+ /**
+ * Submit differencers for running.
+ * All tree *must* have been received before this is called.
+ */
+ public void submitDifferencers()
+ {
+ assert !failed;
+
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ Differencer differencer = new Differencer(desc, r1, r2);
+ logger.debug("Queueing comparison {}", differencer);
+ differencers.add(differencer);
+ }
+ }
+ differencers.start();
+ trees.clear(); // allows gc to do its thing
+ }
+
+ /**
+ * @return true if the given node pair was the last remaining
+ */
+ synchronized boolean completedSynchronization(NodePair nodes, boolean success)
+ {
+ if (!success)
+ failed = true;
+ Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
+ return differencers.completed(completed) == 0;
+ }
+
+ /**
+ * terminate this job.
+ */
+ public void terminate()
+ {
+ if (snapshotLatch != null)
+ {
+ while (snapshotLatch.getCount() > 0)
+ snapshotLatch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
new file mode 100644
index 0000000..596540f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * RepairJobDesc is used from various repair processes to distinguish one RepairJob to another.
+ *
+ * @since 2.0
+ */
+public class RepairJobDesc
+{
+ public static final IVersionedSerializer<RepairJobDesc> serializer = new RepairJobDescSerializer();
+
+ /** RepairSession id */
+ public final UUID sessionId;
+ public final String keyspace;
+ public final String columnFamily;
+ /** repairing range */
+ public final Range<Token> range;
+
+ public RepairJobDesc(UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+ {
+ this.sessionId = sessionId;
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.range = range;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("[repair #");
+ sb.append(sessionId);
+ sb.append(" on ");
+ sb.append(keyspace).append("/").append(columnFamily);
+ sb.append(", ").append(range);
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RepairJobDesc that = (RepairJobDesc) o;
+
+ if (!columnFamily.equals(that.columnFamily)) return false;
+ if (!keyspace.equals(that.keyspace)) return false;
+ if (range != null ? !range.equals(that.range) : that.range != null) return false;
+ if (!sessionId.equals(that.sessionId)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(sessionId, keyspace, columnFamily, range);
+ }
+
+ private static class RepairJobDescSerializer implements IVersionedSerializer<RepairJobDesc>
+ {
+ public void serialize(RepairJobDesc desc, DataOutput out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
+ out.writeUTF(desc.keyspace);
+ out.writeUTF(desc.columnFamily);
+ AbstractBounds.serializer.serialize(desc.range, out, version);
+ }
+
+ public RepairJobDesc deserialize(DataInput in, int version) throws IOException
+ {
+ UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+ Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version);
+ return new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ }
+
+ public long serializedSize(RepairJobDesc desc, int version)
+ {
+ int size = 0;
+ size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
+ size += TypeSizes.NATIVE.sizeof(desc.keyspace);
+ size += TypeSizes.NATIVE.sizeof(desc.columnFamily);
+ size += AbstractBounds.serializer.serializedSize(desc.range, version);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
new file mode 100644
index 0000000..3057a41
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * Handles all repair related message.
+ *
+ * @since 2.0
+ */
+public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
+{
+ public void doVerb(MessageIn<RepairMessage> message, int id)
+ {
+ // TODO add cancel/interrupt message
+ RepairJobDesc desc = message.payload.desc;
+ switch (message.payload.messageType)
+ {
+ case VALIDATION_REQUEST:
+ ValidationRequest validationRequest = (ValidationRequest) message.payload;
+ // trigger read-only compaction
+ ColumnFamilyStore store = Table.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+ Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+ CompactionManager.instance.submitValidation(store, validator);
+ break;
+
+ case SYNC_REQUEST:
+ // forwarded sync request
+ SyncRequest request = (SyncRequest) message.payload;
+ StreamingRepairTask task = new StreamingRepairTask(desc, request);
+ task.run();
+ break;
+
+ default:
+ ActiveRepairService.instance.handleMessage(message.from, message.payload);
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
new file mode 100644
index 0000000..7101b5a
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Triggers repairs with all neighbors for the given table, cfs and range.
+ */
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+ private static Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+ /** Repair session ID */
+ private final UUID id;
+ public final String keyspace;
+ private final String[] cfnames;
+ public final boolean isSequential;
+ /** Range to repair */
+ public final Range<Token> range;
+ public final Set<InetAddress> endpoints;
+
+ private volatile Exception exception;
+ private final AtomicBoolean isFailed = new AtomicBoolean(false);
+
+ // First, all RepairJobs are added to this queue,
+ final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+ // and after receiving all validation, the job is moved to
+ // this map, keyed by CF name.
+ final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
+
+ private final SimpleCondition completed = new SimpleCondition();
+ public final Condition differencingDone = new SimpleCondition();
+
+ private volatile boolean terminated = false;
+
+ /**
+ * Create new repair session.
+ *
+ * @param range range to repair
+ * @param keyspace name of keyspace
+ * @param isSequential true if performing repair on snapshots sequentially
+ * @param isLocal true if you want to perform repair only inside the data center
+ * @param cfnames names of columnfamilies
+ */
+ public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+ {
+ this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames);
+ }
+
+ public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames)
+ {
+ this.id = id;
+ this.isSequential = isSequential;
+ this.keyspace = keyspace;
+ this.cfnames = cfnames;
+ assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
+ this.range = range;
+ this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal);
+ }
+
+ public UUID getId()
+ {
+ return id;
+ }
+
+ public Range<Token> getRange()
+ {
+ return range;
+ }
+
+ /**
+ * Receive merkle tree response or failed response from {@code endpoint} for current repair job.
+ *
+ * @param desc repair job description
+ * @param endpoint endpoint that sent merkle tree
+ * @param tree calculated merkle tree, or null if validation failed
+ */
+ public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree)
+ {
+ RepairJob job = jobs.peek();
+ if (job == null)
+ {
+ assert terminated;
+ return;
+ }
+
+ if (tree == null)
+ {
+ exception = new RepairException(desc, "Validation failed in " + endpoint);
+ forceShutdown();
+ return;
+ }
+
+ logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", getId(), desc.columnFamily, endpoint));
+
+ assert job.desc.equals(desc);
+ if (job.addTree(endpoint, tree) == 0)
+ {
+ logger.debug("All response received for " + getId() + "/" + desc.columnFamily);
+ if (!job.isFailed())
+ {
+ syncingJobs.put(job.desc.columnFamily, job);
+ job.submitDifferencers();
+ }
+
+ // This job is complete, switching to next in line (note that only
+ // one thread will can ever do this)
+ jobs.poll();
+ RepairJob nextJob = jobs.peek();
+ if (nextJob == null)
+ // We are done with this repair session as far as differencing
+ // is considered. Just inform the session
+ differencingDone.signalAll();
+ else
+ nextJob.sendTreeRequests(endpoints);
+ }
+ }
+
+ /**
+ * Notify this session that sync completed/failed with given {@code NodePair}.
+ *
+ * @param desc synced repair job
+ * @param nodes nodes that completed sync
+ * @param success true if sync succeeded
+ */
+ public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+ {
+ RepairJob job = syncingJobs.get(desc.columnFamily);
+ if (job == null)
+ {
+ assert terminated;
+ return;
+ }
+
+ if (!success)
+ {
+ exception = new RepairException(desc, String.format("Sync failed between %s and %s", nodes.endpoint1, nodes.endpoint2));
+ forceShutdown();
+ return;
+ }
+
+ logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily));
+
+ if (job.completedSynchronization(nodes, success))
+ {
+ RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily);
+ String remaining = syncingJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", syncingJobs.size());
+ if (completedJob != null && completedJob.isFailed())
+ logger.warn(String.format("[repair #%s] %s sync failed%s", getId(), desc.columnFamily, remaining));
+ else
+ logger.info(String.format("[repair #%s] %s is fully synced%s", getId(), desc.columnFamily, remaining));
+
+ if (jobs.isEmpty() && syncingJobs.isEmpty())
+ {
+ // this repair session is completed
+ completed.signalAll();
+ }
+ }
+ }
+
+ private String repairedNodes()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(FBUtilities.getBroadcastAddress());
+ for (InetAddress ep : endpoints)
+ sb.append(", ").append(ep);
+ return sb.toString();
+ }
+
+ // we don't care about the return value but care about it throwing exception
+ public void runMayThrow() throws Exception
+ {
+ logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames)));
+
+ if (endpoints.isEmpty())
+ {
+ differencingDone.signalAll();
+ logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range));
+ return;
+ }
+
+ // Checking all nodes are live
+ for (InetAddress endpoint : endpoints)
+ {
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
+ differencingDone.signalAll();
+ logger.error(String.format("[repair #%s] ", getId()) + message);
+ throw new IOException(message);
+ }
+ }
+ ActiveRepairService.instance.addToActiveSessions(this);
+ try
+ {
+ // Create and queue a RepairJob for each column family
+ for (String cfname : cfnames)
+ {
+ RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+ jobs.offer(job);
+ }
+
+ jobs.peek().sendTreeRequests(endpoints);
+
+ // block whatever thread started this session until all requests have been returned:
+ // if this thread dies, the session will still complete in the background
+ completed.await();
+ if (exception == null)
+ {
+ logger.info(String.format("[repair #%s] session completed successfully", getId()));
+ }
+ else
+ {
+ logger.error(String.format("[repair #%s] session completed with the following error", getId()), exception);
+ throw exception;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while waiting for repair.");
+ }
+ finally
+ {
+ // mark this session as terminated
+ terminate();
+ ActiveRepairService.instance.removeFromActiveSessions(this);
+ }
+ }
+
+ public void terminate()
+ {
+ terminated = true;
+ for (RepairJob job : jobs)
+ job.terminate();
+ jobs.clear();
+ syncingJobs.clear();
+ }
+
+ /**
+ * clear all RepairJobs and terminate this session.
+ */
+ public void forceShutdown()
+ {
+ differencingDone.signalAll();
+ completed.signalAll();
+ }
+
+ void failedNode(InetAddress remote)
+ {
+ String errorMsg = String.format("Endpoint %s died", remote);
+ exception = new IOException(errorMsg);
+ // If a node failed, we stop everything (though there could still be some activity in the background)
+ forceShutdown();
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoints.contains(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ // Though unlikely, it is possible to arrive here multiple time and we
+ // want to avoid print an error message twice
+ if (!isFailed.compareAndSet(false, true))
+ return;
+
+ failedNode(endpoint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/RequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RequestCoordinator.java b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
new file mode 100644
index 0000000..ed089ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+*/
+public abstract class RequestCoordinator<R>
+{
+ private final Order<R> orderer;
+
+ public RequestCoordinator(boolean isSequential)
+ {
+ this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
+ }
+
+ public abstract void send(R request);
+
+ public void add(R request)
+ {
+ orderer.add(request);
+ }
+
+ public void start()
+ {
+ orderer.start();
+ }
+
+ // Returns how many request remains
+ public int completed(R request)
+ {
+ return orderer.completed(request);
+ }
+
+ private static abstract class Order<R>
+ {
+ protected final RequestCoordinator<R> coordinator;
+
+ Order(RequestCoordinator<R> coordinator)
+ {
+ this.coordinator = coordinator;
+ }
+
+ public abstract void add(R request);
+ public abstract void start();
+ public abstract int completed(R request);
+ }
+
+ private static class SequentialOrder<R> extends Order<R>
+ {
+ private final Queue<R> requests = new LinkedList<>();
+
+ SequentialOrder(RequestCoordinator<R> coordinator)
+ {
+ super(coordinator);
+ }
+
+ public void add(R request)
+ {
+ requests.add(request);
+ }
+
+ public void start()
+ {
+ if (requests.isEmpty())
+ return;
+
+ coordinator.send(requests.peek());
+ }
+
+ public int completed(R request)
+ {
+ assert request.equals(requests.peek());
+ requests.poll();
+ int remaining = requests.size();
+ if (remaining != 0)
+ coordinator.send(requests.peek());
+ return remaining;
+ }
+ }
+
+ private static class ParallelOrder<R> extends Order<R>
+ {
+ private final Set<R> requests = new HashSet<>();
+
+ ParallelOrder(RequestCoordinator<R> coordinator)
+ {
+ super(coordinator);
+ }
+
+ public void add(R request)
+ {
+ requests.add(request);
+ }
+
+ public void start()
+ {
+ for (R request : requests)
+ coordinator.send(request);
+ }
+
+ public int completed(R request)
+ {
+ requests.remove(request);
+ return requests.size();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
new file mode 100644
index 0000000..4670ce6
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Task that make two nodes exchange (stream) some ranges (for a given table/cf).
+ * This handle the case where the local node is neither of the two nodes that
+ * must stream their range, and allow to register a callback to be called on
+ * completion.
+ */
+public class StreamingRepairTask implements Runnable, StreamEventHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
+
+ /** Repair session ID that this streaming task belongs */
+ public final RepairJobDesc desc;
+ public final SyncRequest request;
+
+ // we expect one callback for the receive, and one for the send
+ private final AtomicInteger outstanding = new AtomicInteger(2);
+
+ public StreamingRepairTask(RepairJobDesc desc, SyncRequest request)
+ {
+ this.desc = desc;
+ this.request = request;
+ }
+
+ /**
+ * Returns true if the task if the task can be executed locally, false if
+ * it has to be forwarded.
+ */
+ public boolean isLocalTask()
+ {
+ return request.initiator.equals(request.src);
+ }
+
+ public void run()
+ {
+ if (request.src.equals(FBUtilities.getBroadcastAddress()))
+ initiateStreaming();
+ else
+ forwardToSource();
+ }
+
+ private void initiateStreaming()
+ {
+ logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
+ StreamResultFuture op = new StreamPlan("Repair")
+ .flushBeforeTransfer(true)
+ // request ranges from the remote node
+ .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
+ // send ranges to the remote node
+ .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
+ .execute();
+ op.addEventListener(this);
+ }
+
+ private void forwardToSource()
+ {
+ logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst));
+ MessagingService.instance().sendOneWay(request.createMessage(), request.src);
+ }
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ // Nothing to do here, all we care about is the final success or failure and that's handled by
+ // onSuccess and onFailure
+ }
+
+ /**
+ * If we succeeded on both stream in and out, reply back to the initiator.
+ */
+ public void onSuccess(StreamState state)
+ {
+ logger.info(String.format("[repair #%s] streaming task succeed, returning response to %s", desc.sessionId, request.initiator));
+ MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true).createMessage(), request.initiator);
+ }
+
+ /**
+ * If we failed on either stream in or out, reply fail to the initiator.
+ */
+ public void onFailure(Throwable t)
+ {
+ MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false).createMessage(), request.initiator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
new file mode 100644
index 0000000..eede4ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * Merkle tree response sent from given endpoint.
+ */
+public class TreeResponse
+{
+ public final InetAddress endpoint;
+ public final MerkleTree tree;
+
+ public TreeResponse(InetAddress endpoint, MerkleTree tree)
+ {
+ this.endpoint = endpoint;
+ this.tree = tree;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
new file mode 100644
index 0000000..97b4ca2
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * A Strategy to handle building and validating a merkle tree for a column family.
+ *
+ * Lifecycle:
+ * 1. prepare() - Initialize tree with samples.
+ * 2. add() - 0 or more times, to add hashes to the tree.
+ * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
+ */
+public class Validator implements Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
+
+ public final RepairJobDesc desc;
+ public final InetAddress initiator;
+ public final MerkleTree tree;
+ public final int gcBefore;
+
+ // null when all rows with the min token have been consumed
+ private transient long validated;
+ private transient MerkleTree.TreeRange range;
+ private transient MerkleTree.TreeRangeIterator ranges;
+ private transient DecoratedKey lastKey;
+
+ public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
+
+ /**
+ * Create Validator with default size of initial Merkle Tree.
+ */
+ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore)
+ {
+ this(desc,
+ initiator,
+ // TODO: memory usage (maxsize) should either be tunable per
+ // CF, globally, or as shared for all CFs in a cluster
+ new MerkleTree(DatabaseDescriptor.getPartitioner(), desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)),
+ gcBefore);
+ }
+
+ public Validator(RepairJobDesc desc, InetAddress initiator, MerkleTree tree, int gcBefore)
+ {
+ this.desc = desc;
+ this.initiator = initiator;
+ this.tree = tree;
+ this.gcBefore = gcBefore;
+ validated = 0;
+ range = null;
+ ranges = null;
+ }
+
+ public void prepare(ColumnFamilyStore cfs)
+ {
+ if (!tree.partitioner().preservesOrder())
+ {
+ // You can't beat an even tree distribution for md5
+ tree.init();
+ }
+ else
+ {
+ List<DecoratedKey> keys = new ArrayList<>();
+ for (DecoratedKey sample : cfs.keySamples(desc.range))
+ {
+ assert desc.range.contains(sample.token): "Token " + sample.token + " is not within range " + desc.range;
+ keys.add(sample);
+ }
+
+ if (keys.isEmpty())
+ {
+ // use an even tree distribution
+ tree.init();
+ }
+ else
+ {
+ int numkeys = keys.size();
+ Random random = new Random();
+ // sample the column family using random keys from the index
+ while (true)
+ {
+ DecoratedKey dk = keys.get(random.nextInt(numkeys));
+ if (!tree.split(dk.token))
+ break;
+ }
+ }
+ }
+ logger.debug("Prepared AEService tree of size " + tree.size() + " for " + desc);
+ ranges = tree.invalids();
+ }
+
+ /**
+ * Called (in order) for every row present in the CF.
+ * Hashes the row, and adds it to the tree being built.
+ *
+ * @param row Row to add hash
+ */
+ public void add(AbstractCompactedRow row)
+ {
+ assert desc.range.contains(row.key.token) : row.key.token + " is not contained in " + desc.range;
+ assert lastKey == null || lastKey.compareTo(row.key) < 0
+ : "row " + row.key + " received out of order wrt " + lastKey;
+ lastKey = row.key;
+
+ if (range == null)
+ range = ranges.next();
+
+ // generate new ranges as long as case 1 is true
+ while (!range.contains(row.key.token))
+ {
+ // add the empty hash, and move to the next range
+ range.addHash(EMPTY_ROW);
+ range = ranges.next();
+ }
+
+ // case 3 must be true: mix in the hashed row
+ range.addHash(rowHash(row));
+ }
+
+ private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
+ {
+ validated++;
+ // MerkleTree uses XOR internally, so we want lots of output bits here
+ MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
+ row.update(digest);
+ return new MerkleTree.RowHash(row.key.token, digest.digest());
+ }
+
+ /**
+ * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
+ */
+ public void complete()
+ {
+ completeTree();
+
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+ logger.debug("Validated " + validated + " rows into AEService tree for " + desc);
+ }
+
+ @VisibleForTesting
+ public void completeTree()
+ {
+ assert ranges != null : "Validator was not prepared()";
+
+ if (range != null)
+ range.addHash(EMPTY_ROW);
+ while (ranges.hasNext())
+ {
+ range = ranges.next();
+ range.addHash(EMPTY_ROW);
+ }
+ }
+
+ /**
+ * Called when some error during the validation happened.
+ * This sends RepairStatus to inform the initiator that the validation has failed.
+ * The actual reason for failure should be looked up in the log of the host calling this function.
+ */
+ public void fail()
+ {
+ logger.error("Failed creating a merkle tree for " + desc + ", " + initiator + " (see log for details)");
+ // send fail message only to nodes >= version 2.0
+ MessagingService.instance().sendOneWay(new ValidationComplete(desc).createMessage(), initiator);
+ }
+
+ /**
+ * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
+ */
+ public void run()
+ {
+ // respond to the request that triggered this validation
+ if (!initiator.equals(FBUtilities.getBroadcastAddress()))
+ logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s/%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
+ MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
new file mode 100644
index 0000000..a0839f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ * Base class of all repair related request/response messages.
+ *
+ * @since 2.0
+ */
+public abstract class RepairMessage
+{
+ public static final IVersionedSerializer<RepairMessage> serializer = new RepairMessageSerializer();
+
+ public static interface MessageSerializer<T extends RepairMessage> extends IVersionedSerializer<T> {}
+
+ public static enum Type
+ {
+ VALIDATION_REQUEST(0, ValidationRequest.serializer),
+ VALIDATION_COMPLETE(1, ValidationComplete.serializer),
+ SYNC_REQUEST(2, SyncRequest.serializer),
+ SYNC_COMPLETE(3, SyncComplete.serializer);
+
+ private final byte type;
+ private final MessageSerializer<RepairMessage> serializer;
+
+ private Type(int type, MessageSerializer<RepairMessage> serializer)
+ {
+ this.type = (byte) type;
+ this.serializer = serializer;
+ }
+
+ public static Type fromByte(byte b)
+ {
+ for (Type t : values())
+ {
+ if (t.type == b)
+ return t;
+ }
+ throw new IllegalArgumentException("Unknown RepairMessage.Type: " + b);
+ }
+ }
+
+ public final Type messageType;
+ public final RepairJobDesc desc;
+
+ protected RepairMessage(Type messageType, RepairJobDesc desc)
+ {
+ this.messageType = messageType;
+ this.desc = desc;
+ }
+
+ public MessageOut<RepairMessage> createMessage()
+ {
+ return new MessageOut<>(MessagingService.Verb.REPAIR_MESSAGE, this, RepairMessage.serializer);
+ }
+
+ public static class RepairMessageSerializer implements IVersionedSerializer<RepairMessage>
+ {
+ public void serialize(RepairMessage message, DataOutput out, int version) throws IOException
+ {
+ out.write(message.messageType.type);
+ message.messageType.serializer.serialize(message, out, version);
+ }
+
+ public RepairMessage deserialize(DataInput in, int version) throws IOException
+ {
+ RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte());
+ return messageType.serializer.deserialize(in, version);
+ }
+
+ public long serializedSize(RepairMessage message, int version)
+ {
+ long size = 1; // for messageType byte
+ size += message.messageType.serializer.serializedSize(message, version);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
new file mode 100644
index 0000000..b54492e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+/**
+ *
+ * @since 2.0
+ */
+public class SyncComplete extends RepairMessage
+{
+ public static final MessageSerializer serializer = new SyncCompleteSerializer();
+
+ /** nodes that involved in this sync */
+ public final NodePair nodes;
+ /** true if sync success, false otherwise */
+ public final boolean success;
+
+ public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+ {
+ super(Type.SYNC_COMPLETE, desc);
+ this.nodes = nodes;
+ this.success = success;
+ }
+
+ public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success)
+ {
+ super(Type.SYNC_COMPLETE, desc);
+ this.nodes = new NodePair(endpoint1, endpoint2);
+ this.success = success;
+ }
+
+ private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
+ {
+ public void serialize(SyncComplete message, DataOutput out, int version) throws IOException
+ {
+ RepairJobDesc.serializer.serialize(message.desc, out, version);
+ NodePair.serializer.serialize(message.nodes, out, version);
+ out.writeBoolean(message.success);
+ }
+
+ public SyncComplete deserialize(DataInput in, int version) throws IOException
+ {
+ RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+ NodePair nodes = NodePair.serializer.deserialize(in, version);
+ return new SyncComplete(desc, nodes, in.readBoolean());
+ }
+
+ public long serializedSize(SyncComplete message, int version)
+ {
+ long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+ size += NodePair.serializer.serializedSize(message.nodes, version);
+ size += TypeSizes.NATIVE.sizeof(message.success);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
new file mode 100644
index 0000000..a06254b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Body part of SYNC_REQUEST repair message.
+ * Request {@code src} node to sync data with {@code dst} node for range {@code ranges}.
+ *
+ * @since 2.0
+ */
+public class SyncRequest extends RepairMessage
+{
+ public static MessageSerializer serializer = new SyncRequestSerializer();
+
+ public final InetAddress initiator;
+ public final InetAddress src;
+ public final InetAddress dst;
+ public final Collection<Range<Token>> ranges;
+
+ public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges)
+ {
+ super(Type.SYNC_REQUEST, desc);
+ this.initiator = initiator;
+ this.src = src;
+ this.dst = dst;
+ this.ranges = ranges;
+ }
+
+ public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
+ {
+ public void serialize(SyncRequest message, DataOutput out, int version) throws IOException
+ {
+ RepairJobDesc.serializer.serialize(message.desc, out, version);
+ CompactEndpointSerializationHelper.serialize(message.initiator, out);
+ CompactEndpointSerializationHelper.serialize(message.src, out);
+ CompactEndpointSerializationHelper.serialize(message.dst, out);
+ out.writeInt(message.ranges.size());
+ for (Range<Token> range : message.ranges)
+ AbstractBounds.serializer.serialize(range, out, version);
+ }
+
+ public SyncRequest deserialize(DataInput in, int version) throws IOException
+ {
+ RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+ InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+ int rangesCount = in.readInt();
+ List<Range<Token>> ranges = new ArrayList<>(rangesCount);
+ for (int i = 0; i < rangesCount; ++i)
+ ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
+ return new SyncRequest(desc, owner, src, dst, ranges);
+ }
+
+ public long serializedSize(SyncRequest message, int version)
+ {
+ long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+ size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+ size += TypeSizes.NATIVE.sizeof(message.ranges.size());
+ for (Range<Token> range : message.ranges)
+ size += AbstractBounds.serializer.serializedSize(range, version);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb4fa4a6/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
new file mode 100644
index 0000000..4ddbc2e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.utils.MerkleTree;
+
+/**
+ * ValidationComplete message is sent when validation compaction completed successfully.
+ *
+ * @since 2.0
+ */
+public class ValidationComplete extends RepairMessage
+{
+ public static MessageSerializer serializer = new ValidationCompleteSerializer();
+
+ /** true if validation success, false otherwise */
+ public final boolean success;
+ /** Merkle hash tree response. Null if validation failed. */
+ public final MerkleTree tree;
+
+ public ValidationComplete(RepairJobDesc desc)
+ {
+ super(Type.VALIDATION_COMPLETE, desc);
+ this.success = false;
+ this.tree = null;
+ }
+
+ public ValidationComplete(RepairJobDesc desc, MerkleTree tree)
+ {
+ super(Type.VALIDATION_COMPLETE, desc);
+ assert tree != null;
+ this.success = true;
+ this.tree = tree;
+ }
+
+ private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
+ {
+ public void serialize(ValidationComplete message, DataOutput out, int version) throws IOException
+ {
+ RepairJobDesc.serializer.serialize(message.desc, out, version);
+ out.writeBoolean(message.success);
+ if (message.success)
+ MerkleTree.serializer.serialize(message.tree, out, version);
+ }
+
+ public ValidationComplete deserialize(DataInput in, int version) throws IOException
+ {
+ RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+ if (in.readBoolean())
+ {
+ MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+ return new ValidationComplete(desc, tree);
+ }
+ else
+ {
+ return new ValidationComplete(desc);
+ }
+ }
+
+ public long serializedSize(ValidationComplete message, int version)
+ {
+ long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+ size += TypeSizes.NATIVE.sizeof(message.success);
+ if (message.success)
+ size += MerkleTree.serializer.serializedSize(message.tree, version);
+ return size;
+ }
+ }
+}