You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/09/11 20:58:51 UTC
[2/2] cassandra git commit: Allow transient node to serve as a repair
coordinator
Allow transient node to serve as a repair coordinator
Patch by Alex Petrov and Blake Eggleston, reviewed by Ariel Weisberg, Blake Eggleston, Marcus Eriksson for CASSANDRA-14693
Co-authored-by: Blake Eggleston <bd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0841353e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0841353e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0841353e
Branch: refs/heads/trunk
Commit: 0841353e90f1cc94dc47b435af87e4d5876478ea
Parents: 2886cac
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Sep 4 19:38:27 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Sep 11 22:58:01 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/DiskBoundaryManager.java | 21 +-
.../cassandra/repair/AbstractSyncTask.java | 31 ---
.../repair/AsymmetricLocalSyncTask.java | 105 --------
.../repair/AsymmetricRemoteSyncTask.java | 19 +-
.../cassandra/repair/AsymmetricSyncTask.java | 81 ------
.../apache/cassandra/repair/CommonRange.java | 8 +-
.../apache/cassandra/repair/LocalSyncTask.java | 158 ++++++++++++
.../org/apache/cassandra/repair/NodePair.java | 91 -------
.../org/apache/cassandra/repair/RepairJob.java | 174 +++++++------
.../apache/cassandra/repair/RepairRunnable.java | 27 +-
.../apache/cassandra/repair/RepairSession.java | 16 +-
.../repair/SymmetricLocalSyncTask.java | 142 -----------
.../repair/SymmetricRemoteSyncTask.java | 22 +-
.../cassandra/repair/SymmetricSyncTask.java | 94 -------
.../apache/cassandra/repair/SyncNodePair.java | 91 +++++++
.../org/apache/cassandra/repair/SyncStat.java | 6 +-
.../org/apache/cassandra/repair/SyncTask.java | 96 +++++++
.../cassandra/repair/messages/SyncComplete.java | 14 +-
.../cassandra/service/ActiveRepairService.java | 3 -
.../service/reads/AbstractReadExecutor.java | 2 +-
.../cassandra/repair/LocalSyncTaskTest.java | 249 +++++++++++++++++++
.../repair/SymmetricLocalSyncTaskTest.java | 232 -----------------
.../repair/SymmetricRemoteSyncTaskTest.java | 6 +-
.../RepairMessageSerializationsTest.java | 6 +-
.../cassandra/service/SerializationsTest.java | 4 +-
26 files changed, 766 insertions(+), 933 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b7bc775..ef285e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
* DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
* AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
* Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index acfe71a..0961a42 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -109,17 +109,7 @@ public class DiskBoundaryManager
if (localRanges == null || localRanges.isEmpty())
return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
- // note that Range.sort unwraps any wraparound ranges, so we need to sort them here
- List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream()
- .filter(Replica::isFull)
- .map(Replica::range)
- .collect(Collectors.toList()));
- List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream()
- .filter(Replica::isTransient)
- .map(Replica::range)
- .collect(Collectors.toList()));
-
- List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs);
+ List<PartitionPosition> positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs);
return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
}
@@ -133,18 +123,19 @@ public class DiskBoundaryManager
*
* The final entry in the returned list will always be the partitioner maximum tokens upper key bound
*/
- private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+ private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint ranges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
{
assert partitioner.splitter().isPresent();
Splitter splitter = partitioner.splitter().get();
boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
- List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size());
- for (Range<Token> r : fullRanges)
+ List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(ranges.size());
+ // note that Range.sort unwraps any wraparound ranges, so we need to sort them here
+ for (Range<Token> r : Range.sort(ranges.fullRanges()))
weightedRanges.add(new Splitter.WeightedRange(1.0, r));
- for (Range<Token> r : transientRanges)
+ for (Range<Token> r : Range.sort(ranges.transientRanges()))
weightedRanges.add(new Splitter.WeightedRange(0.1, r));
weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
deleted file mode 100644
index 124baa1..0000000
--- a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.List;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-
-public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable
-{
- protected abstract void startSync(List<Range<Token>> rangesToStream);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
deleted file mode 100644
index eaf890a..0000000
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler
-{
- private final UUID pendingRepair;
- private final TraceState state = Tracing.instance.get();
-
- public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
- {
- super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind);
- this.pendingRepair = pendingRepair;
- }
-
- public void startSync(List<Range<Token>> rangesToFetch)
- {
- StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
- 1, false,
- pendingRepair,
- previewKind)
- .listeners(this)
- .flushBeforeTransfer(pendingRepair == null)
- // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
- .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch),
- RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
- plan.execute();
-
- }
-
- public void handleStreamEvent(StreamEvent event)
- {
- if (state == null)
- return;
- switch (event.eventType)
- {
- case STREAM_PREPARED:
- StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
- state.trace("Streaming session with {} prepared", spe.session.peer);
- break;
- case STREAM_COMPLETE:
- StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
- state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
- break;
- case FILE_PROGRESS:
- ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
- state.trace("{}/{} ({}%) {} idx:{}{}",
- new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
- FBUtilities.prettyPrintMemory(pi.totalBytes),
- pi.currentBytes * 100 / pi.totalBytes,
- pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
- pi.sessionIndex,
- pi.peer });
- }
- }
-
- public void onSuccess(StreamState result)
- {
- String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily);
- Tracing.traceRepair(message);
- set(stat);
- finished();
- }
-
- public void onFailure(Throwable t)
- {
- setException(t);
- finished();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 2b171c9..9ba33dd 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -26,27 +26,36 @@ import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
-public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
+/**
+ * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
+ * data with other target replica.
+ *
+ * When AsymmetricRemoteSyncTask receives SyncComplete from the target, task completes.
+ */
+public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
{
- public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+ public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom,
+ List<Range<Token>> rangesToFetch, PreviewKind previewKind)
{
super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
}
+
public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind)
{
this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind);
}
- public void startSync(List<Range<Token>> rangesToFetch)
+ public void startSync()
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
+ AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
Tracing.traceRepair(message);
MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
@@ -60,7 +69,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp
}
else
{
- setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom)));
+ setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer)));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
deleted file mode 100644
index 35474af..0000000
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tracing.Tracing;
-
-public abstract class AsymmetricSyncTask extends AbstractSyncTask
-{
- private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
- protected final RepairJobDesc desc;
- protected final InetAddressAndPort fetchFrom;
- protected final List<Range<Token>> rangesToFetch;
- protected final InetAddressAndPort fetchingNode;
- protected final PreviewKind previewKind;
- private long startTime = Long.MIN_VALUE;
- protected volatile SyncStat stat;
-
- public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
- {
- assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom;
- this.desc = desc;
- this.fetchFrom = fetchFrom;
- this.fetchingNode = fetchingNode;
- this.rangesToFetch = rangesToFetch;
- // todo: make an AsymmetricSyncStat?
- stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
- this.previewKind = previewKind;
- }
-
- public void run()
- {
- startTime = System.currentTimeMillis();
- // choose a repair method based on the significance of the difference
- String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily);
- if (rangesToFetch.isEmpty())
- {
- logger.info(String.format(format, "are consistent"));
- Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily);
- set(stat);
- return;
- }
-
- // non-0 difference: perform streaming repair
- logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync"));
- Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily);
- startSync(rangesToFetch);
- }
-
- protected void finished()
- {
- if (startTime != Long.MIN_VALUE)
- Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/CommonRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
index 928e570..6b55dc7 100644
--- a/src/java/org/apache/cassandra/repair/CommonRange.java
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -48,7 +48,13 @@ public class CommonRange
this.endpoints = ImmutableSet.copyOf(endpoints);
this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
- this.ranges = new ArrayList(ranges);
+ this.ranges = new ArrayList<>(ranges);
+ }
+
+ public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints)
+ {
+ // Use strict equality here, as worst thing that can happen is we generate one more stream
+ return this.endpoints.equals(endpoints) && this.transEndpoints.equals(transEndpoints);
}
public boolean equals(Object o)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
new file mode 100644
index 0000000..1923fbe
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
+
+/**
+ * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
+ */
+public class LocalSyncTask extends SyncTask implements StreamEventHandler
+{
+ private final TraceState state = Tracing.instance.get();
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
+
+ private final UUID pendingRepair;
+ private final boolean requestRanges;
+ private final boolean transferRanges;
+
+ public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair,
+ boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
+ {
+ this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees),
+ pendingRepair, requestRanges, transferRanges, previewKind);
+ }
+
+ public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
+ List<Range<Token>> diff, UUID pendingRepair,
+ boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
+ {
+ super(desc, local, remote, diff, previewKind);
+ Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job");
+ Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort()));
+
+ this.pendingRepair = pendingRepair;
+ this.requestRanges = requestRanges;
+ this.transferRanges = transferRanges;
+ }
+
+ @VisibleForTesting
+ StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> differences)
+ {
+ StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
+ .listeners(this)
+ .flushBeforeTransfer(pendingRepair == null);
+
+ if (requestRanges)
+ {
+ // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+ plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
+ RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
+ }
+
+ if (transferRanges)
+ {
+ // send ranges to the remote node if we are not performing a pull repair
+ // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
+ plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
+ }
+
+ return plan;
+ }
+
+ /**
+ * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+ * that will be called out of band once the streams complete.
+ */
+ @Override
+ protected void startSync()
+ {
+ InetAddressAndPort remote = nodePair.peer;
+
+ String message = String.format("Performing streaming repair of %d ranges with %s", rangesToSync.size(), remote);
+ logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+ Tracing.traceRepair(message);
+
+ createStreamPlan(remote, rangesToSync).execute();
+ }
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (state == null)
+ return;
+ switch (event.eventType)
+ {
+ case STREAM_PREPARED:
+ StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
+ state.trace("Streaming session with {} prepared", spe.session.peer);
+ break;
+ case STREAM_COMPLETE:
+ StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
+ state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
+ break;
+ case FILE_PROGRESS:
+ ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+ state.trace("{}/{} ({}%) {} idx:{}{}",
+ new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
+ FBUtilities.prettyPrintMemory(pi.totalBytes),
+ pi.currentBytes * 100 / pi.totalBytes,
+ pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
+ pi.sessionIndex,
+ pi.peer });
+ }
+ }
+
+ public void onSuccess(StreamState result)
+ {
+ String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
+ logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+ Tracing.traceRepair(message);
+ set(stat.withSummaries(result.createSummaries()));
+ finished();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ setException(t);
+ finished();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
deleted file mode 100644
index bfb237e..0000000
--- a/src/java/org/apache/cassandra/repair/NodePair.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.io.IOException;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-
-/**
- * NodePair is used for repair message body to indicate the pair of nodes.
- *
- * @since 2.0
- */
-public class NodePair
-{
- public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
-
- public final InetAddressAndPort endpoint1;
- public final InetAddressAndPort endpoint2;
-
- public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
- {
- this.endpoint1 = endpoint1;
- this.endpoint2 = endpoint2;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- NodePair nodePair = (NodePair) o;
- return endpoint1.equals(nodePair.endpoint1) && endpoint2.equals(nodePair.endpoint2);
- }
-
- @Override
- public String toString()
- {
- return endpoint1.toString() + " - " + endpoint2.toString();
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hashCode(endpoint1, endpoint2);
- }
-
- public static class NodePairSerializer implements IVersionedSerializer<NodePair>
- {
- public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException
- {
- CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version);
- CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version);
- }
-
- public NodePair deserialize(DataInputPlus in, int version) throws IOException
- {
- InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
- InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
- return new NodePair(ep1, ep2);
- }
-
- public long serializedSize(NodePair nodePair, int version)
- {
- return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version)
- + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index d38435b..c96e7fb 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
import java.util.*;
import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.apache.cassandra.utils.Pair;
*/
public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
- private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
+ private static final Logger logger = LoggerFactory.getLogger(RepairJob.class);
private final RepairSession session;
private final RepairJobDesc desc;
@@ -128,7 +129,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}
// When all validations complete, submit sync tasks
- ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor);
+ ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations,
+ optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing,
+ taskExecutor);
// When all sync complete, set the final result
Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
@@ -165,107 +168,116 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
return session.commonRange.transEndpoints.contains(ep);
}
- private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
+ private ListenableFuture<List<SyncStat>> standardSyncing(List<TreeResponse> trees)
{
- return trees ->
- {
- InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+ InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
- List<AbstractSyncTask> syncTasks = new ArrayList<>();
- // We need to difference all trees one against another
- for (int i = 0; i < trees.size() - 1; ++i)
+ List<SyncTask> syncTasks = new ArrayList<>();
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
{
- TreeResponse r1 = trees.get(i);
- for (int j = i + 1; j < trees.size(); ++j)
+ TreeResponse r2 = trees.get(j);
+
+ // Avoid streming between two tansient replicas
+ if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+ continue;
+
+ SyncTask task;
+ if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
{
- TreeResponse r2 = trees.get(j);
+ TreeResponse self = r1.endpoint.equals(local) ? r1 : r2;
+ TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2;
- if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+ // pull only if local is full
+ boolean requestRanges = !isTransient(self.endpoint);
+ // push only if remote is full; additionally check for pull repair
+ boolean transferRanges = !isTransient(remote.endpoint) && !session.pullRepair;
+
+ // Nothing to do
+ if (!requestRanges && !transferRanges)
continue;
- AbstractSyncTask task;
- if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
- {
- InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint;
- task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
- }
- else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
- {
- TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
- TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1;
- task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
- session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task);
- }
- else
- {
- task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
- // SymmetricRemoteSyncTask expects SyncComplete message sent back.
- // Register task to RepairSession to receive response.
- session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task);
- }
- syncTasks.add(task);
- taskExecutor.submit(task);
+ task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null,
+ requestRanges, transferRanges, session.previewKind);
+ }
+ else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
+ {
+ // Stream only from transient replica
+ TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2;
+ TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1;
+ task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind);
+ session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task);
+ }
+ else
+ {
+ task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind);
+ // SymmetricRemoteSyncTask expects SyncComplete message sent back.
+ // Register task to RepairSession to receive response.
+ session.waitForSync(Pair.create(desc, task.nodePair()), (SymmetricRemoteSyncTask) task);
}
+ syncTasks.add(task);
+ taskExecutor.submit(task);
}
- return Futures.allAsList(syncTasks);
- };
+ }
+ return Futures.allAsList(syncTasks);
}
- private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing()
+ private ListenableFuture<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees)
{
- return trees ->
- {
- InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+ InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
- List<AbstractSyncTask> syncTasks = new ArrayList<>();
- // We need to difference all trees one against another
- DifferenceHolder diffHolder = new DifferenceHolder(trees);
+ List<SyncTask> syncTasks = new ArrayList<>();
+ // We need to difference all trees one against another
+ DifferenceHolder diffHolder = new DifferenceHolder(trees);
- logger.debug("diffs = {}", diffHolder);
- PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
- candidates.stream()
- .filter(node -> getDC(streaming)
- .equals(getDC(node)))
- .collect(Collectors.toSet());
- ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+ logger.debug("diffs = {}", diffHolder);
+ PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
+ candidates.stream()
+ .filter(node -> getDC(streaming)
+ .equals(getDC(node)))
+ .collect(Collectors.toSet());
+ ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
- for (int i = 0; i < trees.size(); i++)
- {
- InetAddressAndPort address = trees.get(i).endpoint;
+ for (int i = 0; i < trees.size(); i++)
+ {
+ InetAddressAndPort address = trees.get(i).endpoint;
- // we don't stream to transient replicas
- if (isTransient(address))
- continue;
+ // we don't stream to transient replicas
+ if (isTransient(address))
+ continue;
- HostDifferences streamsFor = reducedDifferences.get(address);
- if (streamsFor != null)
+ HostDifferences streamsFor = reducedDifferences.get(address);
+ if (streamsFor != null)
+ {
+ Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
+ for (InetAddressAndPort fetchFrom : streamsFor.hosts())
{
- assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
- for (InetAddressAndPort fetchFrom : streamsFor.hosts())
+ List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+ logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
+ SyncTask task;
+ if (address.equals(local))
{
- List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
- logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
- AsymmetricSyncTask task;
- if (address.equals(local))
- {
- task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind);
- }
- else
- {
- task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
- session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task);
- }
- syncTasks.add(task);
- taskExecutor.submit(task);
+ task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null,
+ true, false, session.previewKind);
}
- }
- else
- {
- logger.debug("Node {} has nothing to stream", address);
+ else
+ {
+ task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
+ session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task);
+ }
+ syncTasks.add(task);
+ taskExecutor.submit(task);
}
}
- return Futures.allAsList(syncTasks);
- };
+ else
+ {
+ logger.debug("Node {} has nothing to stream", address);
+ }
+ }
+ return Futures.allAsList(syncTasks);
}
private String getDC(InetAddressAndPort address)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 8d3cd54..fa0c2a9 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -196,21 +196,17 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
Set<InetAddressAndPort> allNeighbors = new HashSet<>();
List<CommonRange> commonRanges = new ArrayList<>();
- //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
- //calculation multiple times
- // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica
- Iterable<Range<Token>> keyspaceLocalRanges = storageService
- .getLocalReplicas(keyspace)
- .filter(Replica::isFull)
- .ranges();
-
try
{
+ //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges();
+
for (Range<Token> range : options.getRanges())
{
EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
- options.getDataCenters(),
- options.getHosts());
+ options.getDataCenters(),
+ options.getHosts());
addRangeToNeighbors(commonRanges, range, neighbors);
allNeighbors.addAll(neighbors.endpoints());
@@ -647,17 +643,16 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
ImmutableList.of(failureMessage, completionMessage));
}
- private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
+ private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
{
Set<InetAddressAndPort> endpoints = neighbors.endpoints();
Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
- for (int i = 0; i < neighborRangeList.size(); i++)
- {
- CommonRange cr = neighborRangeList.get(i);
- if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints))
+ for (CommonRange commonRange : neighborRangeList)
+ {
+ if (commonRange.matchesEndpoints(endpoints, transEndpoints))
{
- cr.ranges.add(range);
+ commonRange.ranges.add(range);
return;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2ff60ec..4dc563a 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -54,9 +54,8 @@ import org.apache.cassandra.utils.Pair;
* ({@link org.apache.cassandra.repair.ValidationTask}) and waits until all trees are received (in
* validationComplete()).
* </li>
- * <li>Synchronization phase: once all trees are received, the job compares each tree with
- * all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the
- * concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned.
+ * <li>Synchronization phase: once all trees are received, the job compares each tree with all the others. If there is
+ * difference between 2 trees, the differences between the 2 endpoints will be streamed with a {@link SyncTask}.
* </li>
* </ol>
* The job is done once all its SyncTasks are done (i.e. have either computed no differences
@@ -103,7 +102,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
// Remote syncing jobs wait response in syncingTasks map
- private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
// Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
@@ -195,12 +194,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
validating.put(key, task);
}
- public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task)
+ public void waitForSync(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task)
{
syncingTasks.put(key, task);
}
-
/**
* Receive merkle tree response or failed response from {@code endpoint} for current repair job.
*
@@ -224,13 +222,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
}
/**
- * Notify this session that sync completed/failed with given {@code NodePair}.
+ * Notify this session that sync completed/failed with given {@code SyncNodePair}.
*
* @param desc synced repair job
* @param nodes nodes that completed sync
* @param success true if sync succeeded
*/
- public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
+ public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
{
CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
if (task == null)
@@ -240,7 +238,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
}
if (logger.isDebugEnabled())
- logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.endpoint1, nodes.endpoint2, desc.columnFamily);
+ logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily);
task.syncComplete(success, summaries);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
deleted file mode 100644
index 7eedab7..0000000
--- a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamOperation;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica.
- */
-public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler
-{
- private final TraceState state = Tracing.instance.get();
-
- private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class);
-
- private final boolean remoteIsTransient;
- private final UUID pendingRepair;
- private final boolean pullRepair;
-
- public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind)
- {
- super(desc, r1, r2, previewKind);
- this.remoteIsTransient = remoteIsTransient;
- this.pendingRepair = pendingRepair;
- this.pullRepair = pullRepair;
- }
-
- @VisibleForTesting
- StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences)
- {
- StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
- .listeners(this)
- .flushBeforeTransfer(pendingRepair == null)
- // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
- .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences),
- RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node
-
- if (!pullRepair && !remoteIsTransient)
- {
- // send ranges to the remote node if we are not performing a pull repair
- // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here
- plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
- }
-
- return plan;
- }
-
- /**
- * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
- * that will be called out of band once the streams complete.
- */
- @Override
- protected void startSync(List<Range<Token>> differences)
- {
- InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
- InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
-
- String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
- logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
- Tracing.traceRepair(message);
-
- createStreamPlan(dst, differences).execute();
- }
-
- public void handleStreamEvent(StreamEvent event)
- {
- if (state == null)
- return;
- switch (event.eventType)
- {
- case STREAM_PREPARED:
- StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
- state.trace("Streaming session with {} prepared", spe.session.peer);
- break;
- case STREAM_COMPLETE:
- StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
- state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
- break;
- case FILE_PROGRESS:
- ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
- state.trace("{}/{} ({}%) {} idx:{}{}",
- new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
- FBUtilities.prettyPrintMemory(pi.totalBytes),
- pi.currentBytes * 100 / pi.totalBytes,
- pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
- pi.sessionIndex,
- pi.peer });
- }
- }
-
- public void onSuccess(StreamState result)
- {
- String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
- logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
- Tracing.traceRepair(message);
- set(stat.withSummaries(result.createSummaries()));
- finished();
- }
-
- public void onFailure(Throwable t)
- {
- setException(t);
- finished();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index 1f2740f..4e44c15 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -18,8 +18,8 @@
package org.apache.cassandra.repair;
import java.util.List;
-import java.util.function.Predicate;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,13 +29,13 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
/**
* SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
@@ -43,13 +43,20 @@ import org.apache.cassandra.utils.FBUtilities;
*
* When SymmetricRemoteSyncTask receives SyncComplete from remote node, task completes.
*/
-public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements CompletableRemoteSyncTask
+public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
{
private static final Logger logger = LoggerFactory.getLogger(SymmetricRemoteSyncTask.class);
public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
{
- super(desc, r1, r2, previewKind);
+ super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind);
+ }
+
+ @VisibleForTesting
+ SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, InetAddressAndPort e2,
+ List<Range<Token>> differences, PreviewKind previewKind)
+ {
+ super(desc, e1, e2, differences, previewKind);
}
void sendRequest(RepairMessage request, InetAddressAndPort to)
@@ -58,11 +65,12 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple
}
@Override
- protected void startSync(List<Range<Token>> differences)
+ protected void startSync()
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
+ SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
+ Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
Tracing.traceRepair(message);
@@ -77,7 +85,7 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple
}
else
{
- setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
+ setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer)));
}
finished();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
deleted file mode 100644
index 3da2293..0000000
--- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTrees;
-
-/**
- * SymmetricSyncTask will calculate the difference of MerkleTree between two nodes
- * and perform necessary operation to repair replica.
- */
-public abstract class SymmetricSyncTask extends AbstractSyncTask
-{
- private static Logger logger = LoggerFactory.getLogger(SymmetricSyncTask.class);
-
- protected final RepairJobDesc desc;
- protected final TreeResponse r1;
- protected final TreeResponse r2;
- protected final PreviewKind previewKind;
-
- protected volatile SyncStat stat;
- protected long startTime = Long.MIN_VALUE;
-
- public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind)
- {
- this.desc = desc;
- this.r1 = r1;
- this.r2 = r2;
- this.previewKind = previewKind;
- }
-
- /**
- * Compares trees, and triggers repairs for any ranges that mismatch.
- */
- public void run()
- {
- startTime = System.currentTimeMillis();
- // compare trees, and collect differences
- List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees);
-
- stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size());
-
- // choose a repair method based on the significance of the difference
- String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily);
- if (differences.isEmpty())
- {
- logger.info(String.format(format, "are consistent"));
- Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily);
- set(stat);
- return;
- }
-
- // non-0 difference: perform streaming repair
- logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
- Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
- startSync(differences);
- }
-
- public SyncStat getCurrentStat()
- {
- return stat;
- }
-
- protected void finished()
- {
- if (startTime != Long.MIN_VALUE)
- Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncNodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncNodePair.java b/src/java/org/apache/cassandra/repair/SyncNodePair.java
new file mode 100644
index 0000000..b353eb3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncNodePair.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.io.IOException;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+
+/**
+ * SyncNodePair is used for repair message body to indicate the pair of nodes.
+ *
+ * @since 2.0
+ */
+public class SyncNodePair
+{
+ public static IVersionedSerializer<SyncNodePair> serializer = new NodePairSerializer();
+
+ public final InetAddressAndPort coordinator;
+ public final InetAddressAndPort peer;
+
+ public SyncNodePair(InetAddressAndPort coordinator, InetAddressAndPort peer)
+ {
+ this.coordinator = coordinator;
+ this.peer = peer;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SyncNodePair nodePair = (SyncNodePair) o;
+ return coordinator.equals(nodePair.coordinator) && peer.equals(nodePair.peer);
+ }
+
+ @Override
+ public String toString()
+ {
+ return coordinator.toString() + " - " + peer.toString();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(coordinator, peer);
+ }
+
+ public static class NodePairSerializer implements IVersionedSerializer<SyncNodePair>
+ {
+ public void serialize(SyncNodePair nodePair, DataOutputPlus out, int version) throws IOException
+ {
+ CompactEndpointSerializationHelper.instance.serialize(nodePair.coordinator, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(nodePair.peer, out, version);
+ }
+
+ public SyncNodePair deserialize(DataInputPlus in, int version) throws IOException
+ {
+ InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ return new SyncNodePair(ep1, ep2);
+ }
+
+ public long serializedSize(SyncNodePair nodePair, int version)
+ {
+ return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.coordinator, version)
+ + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.peer, version);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncStat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java
index dab5659..7bb503f 100644
--- a/src/java/org/apache/cassandra/repair/SyncStat.java
+++ b/src/java/org/apache/cassandra/repair/SyncStat.java
@@ -26,16 +26,16 @@ import org.apache.cassandra.streaming.SessionSummary;
*/
public class SyncStat
{
- public final NodePair nodes;
+ public final SyncNodePair nodes;
public final long numberOfDifferences; // TODO: revert to Range<Token>
public final List<SessionSummary> summaries;
- public SyncStat(NodePair nodes, long numberOfDifferences)
+ public SyncStat(SyncNodePair nodes, long numberOfDifferences)
{
this(nodes, numberOfDifferences, null);
}
- public SyncStat(NodePair nodes, long numberOfDifferences, List<SessionSummary> summaries)
+ public SyncStat(SyncNodePair nodes, long numberOfDifferences, List<SessionSummary> summaries)
{
this.nodes = nodes;
this.numberOfDifferences = numberOfDifferences;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java
new file mode 100644
index 0000000..ccbd26c
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tracing.Tracing;
+
+public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+ private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
+
+ protected final RepairJobDesc desc;
+ protected final List<Range<Token>> rangesToSync;
+ protected final PreviewKind previewKind;
+ protected final SyncNodePair nodePair;
+
+ protected volatile long startTime = Long.MIN_VALUE;
+ protected final SyncStat stat;
+
+ protected SyncTask(RepairJobDesc desc, InetAddressAndPort primaryEndpoint, InetAddressAndPort peer, List<Range<Token>> rangesToSync, PreviewKind previewKind)
+ {
+ Preconditions.checkArgument(!peer.equals(primaryEndpoint), "Sending and receiving node are the same: %s", peer);
+ this.desc = desc;
+ this.rangesToSync = rangesToSync;
+ this.nodePair = new SyncNodePair(primaryEndpoint, peer);
+ this.previewKind = previewKind;
+ this.stat = new SyncStat(nodePair, rangesToSync.size());
+ }
+
+ protected abstract void startSync();
+
+ public SyncNodePair nodePair()
+ {
+ return nodePair;
+ }
+
+ /**
+ * Compares trees, and triggers repairs for any ranges that mismatch.
+ */
+ public final void run()
+ {
+ startTime = System.currentTimeMillis();
+
+
+ // choose a repair method based on the significance of the difference
+ String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), nodePair.coordinator, nodePair.peer, desc.columnFamily);
+ if (rangesToSync.isEmpty())
+ {
+ logger.info(String.format(format, "are consistent"));
+ Tracing.traceRepair("Endpoint {} is consistent with {} for {}", nodePair.coordinator, nodePair.peer, desc.columnFamily);
+ set(stat);
+ return;
+ }
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, "have " + rangesToSync.size() + " range(s) out of sync"));
+ Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", nodePair.coordinator, rangesToSync.size(), nodePair.peer, desc.columnFamily);
+ startSync();
+ }
+
+
+ protected void finished()
+ {
+ if (startTime != Long.MIN_VALUE)
+ Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 1f1344d..c51d1fd 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.NodePair;
+import org.apache.cassandra.repair.SyncNodePair;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.streaming.SessionSummary;
@@ -39,13 +39,13 @@ public class SyncComplete extends RepairMessage
public static final MessageSerializer serializer = new SyncCompleteSerializer();
/** nodes that involved in this sync */
- public final NodePair nodes;
+ public final SyncNodePair nodes;
/** true if sync success, false otherwise */
public final boolean success;
public final List<SessionSummary> summaries;
- public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
+ public SyncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
{
super(Type.SYNC_COMPLETE, desc);
this.nodes = nodes;
@@ -57,7 +57,7 @@ public class SyncComplete extends RepairMessage
{
super(Type.SYNC_COMPLETE, desc);
this.summaries = summaries;
- this.nodes = new NodePair(endpoint1, endpoint2);
+ this.nodes = new SyncNodePair(endpoint1, endpoint2);
this.success = success;
}
@@ -85,7 +85,7 @@ public class SyncComplete extends RepairMessage
public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
- NodePair.serializer.serialize(message.nodes, out, version);
+ SyncNodePair.serializer.serialize(message.nodes, out, version);
out.writeBoolean(message.success);
out.writeInt(message.summaries.size());
@@ -98,7 +98,7 @@ public class SyncComplete extends RepairMessage
public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
- NodePair nodes = NodePair.serializer.deserialize(in, version);
+ SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version);
boolean success = in.readBoolean();
int numSummaries = in.readInt();
@@ -114,7 +114,7 @@ public class SyncComplete extends RepairMessage
public long serializedSize(SyncComplete message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
- size += NodePair.serializer.serializedSize(message.nodes, version);
+ size += SyncNodePair.serializer.serializedSize(message.nodes, version);
size += TypeSizes.sizeof(message.success);
size += TypeSizes.sizeof(message.summaries.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 9f37095..8ffca6a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -33,7 +33,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -62,14 +61,12 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.CommonRange;
-import org.apache.cassandra.repair.RepairRunnable;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 031326e..5543fcc 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -113,7 +113,7 @@ public abstract class AbstractReadExecutor
protected void makeFullDataRequests(ReplicaCollection<?> replicas)
{
assert all(replicas, Replica::isFull);
- makeRequests(command, replicas.filter(Replica::isFull));
+ makeRequests(command, replicas);
}
protected void makeTransientDataRequests(ReplicaCollection<?> replicas)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org