You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/12/07 12:58:28 UTC
[2/2] cassandra git commit: Add option to optimize Merkle tree
comparison across replicas
Add option to optimize Merkle tree comparison across replicas
Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-3200
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb56d9fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb56d9fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb56d9fc
Branch: refs/heads/trunk
Commit: cb56d9fc3c773abbefa2044ce41ddbfb7717e0cb
Parents: a6f3983
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Dec 7 13:55:44 2017 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Dec 7 13:55:56 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 4 +
.../repair/AsymmetricLocalSyncTask.java | 105 +++++
.../repair/AsymmetricRemoteSyncTask.java | 60 +++
.../cassandra/repair/AsymmetricSyncTask.java | 85 ++++
.../repair/CompletableRemoteSyncTask.java | 28 ++
.../apache/cassandra/repair/LocalSyncTask.java | 1 -
.../apache/cassandra/repair/RemoteSyncTask.java | 2 +-
.../org/apache/cassandra/repair/RepairJob.java | 137 ++++--
.../repair/RepairMessageVerbHandler.java | 24 +-
.../apache/cassandra/repair/RepairRunnable.java | 1 +
.../apache/cassandra/repair/RepairSession.java | 12 +-
.../cassandra/repair/StreamingRepairTask.java | 36 +-
.../repair/asymmetric/DifferenceHolder.java | 98 +++++
.../repair/asymmetric/HostDifferences.java | 83 ++++
.../asymmetric/IncomingRepairStreamTracker.java | 81 ++++
.../repair/asymmetric/PreferedNodeFilter.java | 27 ++
.../repair/asymmetric/RangeDenormalizer.java | 125 ++++++
.../repair/asymmetric/ReduceHelper.java | 137 ++++++
.../repair/asymmetric/StreamFromOptions.java | 109 +++++
.../repair/messages/AsymmetricSyncRequest.java | 132 ++++++
.../repair/messages/RepairMessage.java | 3 +-
.../cassandra/repair/messages/RepairOption.java | 24 +-
.../cassandra/service/ActiveRepairService.java | 4 +-
.../apache/cassandra/tools/nodetool/Repair.java | 6 +-
.../cassandra/repair/LocalSyncTaskTest.java | 2 +-
.../cassandra/repair/RepairSessionTest.java | 2 +-
.../repair/StreamingRepairTaskTest.java | 5 +-
.../repair/asymmetric/DifferenceHolderTest.java | 106 +++++
.../asymmetric/RangeDenormalizerTest.java | 86 ++++
.../repair/asymmetric/ReduceHelperTest.java | 425 +++++++++++++++++++
.../asymmetric/StreamFromOptionsTest.java | 124 ++++++
.../apache/cassandra/utils/MerkleTreesTest.java | 2 +-
33 files changed, 2013 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34af97d..ef414b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
* Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
* Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
* Support a means of logging all queries as they were invoked (CASSANDRA-13983)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a14f7ba..43f57f2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - An experimental option to compare all merkle trees together has been added - for example, in
+ a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the
+ out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding
+ "-os" to nodetool repair. See CASSANDRA-3200.
- The currentTimestamp, currentDate, currentTime and currentTimeUUID functions have been added.
See CASSANDRA-13132
- Support for arithmetic operations between `timestamp`/`date` and `duration` has been added.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
new file mode 100644
index 0000000..5464520
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler
+{
+ private final UUID pendingRepair;
+ private final TraceState state = Tracing.instance.get();
+
+ public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
+ {
+ super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind);
+ this.pendingRepair = pendingRepair;
+ }
+
+ public void startSync(List<Range<Token>> rangesToFetch)
+ {
+ InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom);
+ StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
+ 1, false,
+ false,
+ pendingRepair,
+ previewKind)
+ .listeners(this)
+ .flushBeforeTransfer(pendingRepair == null)
+ // request ranges from the remote node
+ .requestRanges(fetchFrom, preferred, desc.keyspace, rangesToFetch, desc.columnFamily);
+ plan.execute();
+
+ }
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (state == null)
+ return;
+ switch (event.eventType)
+ {
+ case STREAM_PREPARED:
+ StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
+ state.trace("Streaming session with {} prepared", spe.session.peer);
+ break;
+ case STREAM_COMPLETE:
+ StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
+ state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
+ break;
+ case FILE_PROGRESS:
+ ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+ state.trace("{}/{} ({}%) {} idx:{}{}",
+ new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes),
+ FBUtilities.prettyPrintMemory(pi.totalBytes),
+ pi.currentBytes * 100 / pi.totalBytes,
+ pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
+ pi.sessionIndex,
+ pi.peer });
+ }
+ }
+
+ public void onSuccess(StreamState result)
+ {
+ String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily);
+ Tracing.traceRepair(message);
+ set(stat);
+ finished();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ setException(t);
+ finished();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
new file mode 100644
index 0000000..d70975d
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
+{
+ public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+ {
+ super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
+ }
+
+ public void startSync(List<Range<Token>> rangesToFetch)
+ {
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
+ String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
+ Tracing.traceRepair(message);
+ MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode);
+ }
+ public void syncComplete(boolean success, List<SessionSummary> summaries)
+ {
+ if (success)
+ {
+ set(stat.withSummaries(summaries));
+ }
+ else
+ {
+ setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
new file mode 100644
index 0000000..fe00058
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tracing.Tracing;
+
+public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable
+{
+ private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
+ protected final RepairJobDesc desc;
+ protected final InetAddress fetchFrom;
+ protected final List<Range<Token>> rangesToFetch;
+ protected final InetAddress fetchingNode;
+ protected final PreviewKind previewKind;
+ private long startTime = Long.MIN_VALUE;
+ protected volatile SyncStat stat;
+
+
+ public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+ {
+ this.desc = desc;
+ this.fetchFrom = fetchFrom;
+ this.fetchingNode = fetchingNode;
+ this.rangesToFetch = rangesToFetch;
+ // todo: make an AsymmetricSyncStat?
+ stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size());
+ this.previewKind = previewKind;
+ }
+ public void run()
+ {
+ startTime = System.currentTimeMillis();
+ // choose a repair method based on the significance of the difference
+ String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily);
+ if (rangesToFetch.isEmpty())
+ {
+ logger.info(String.format(format, "are consistent"));
+ Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily);
+ set(stat);
+ return;
+ }
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync"));
+ Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily);
+ startSync(rangesToFetch);
+ }
+
+ protected void finished()
+ {
+ if (startTime != Long.MIN_VALUE)
+ Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+
+ public abstract void startSync(List<Range<Token>> rangesToFetch);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
new file mode 100644
index 0000000..c4fe6c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.util.List;
+
+import org.apache.cassandra.streaming.SessionSummary;
+
+public interface CompletableRemoteSyncTask
+{
+ void syncComplete(boolean success, List<SessionSummary> summaries);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 343950b..8545b22 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -61,7 +61,6 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
this.pullRepair = pullRepair;
}
-
@VisibleForTesting
StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index 6cc786e..93feb72 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.FBUtilities;
*
* When RemoteSyncTask receives SyncComplete from remote node, task completes.
*/
-public class RemoteSyncTask extends SyncTask
+public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
{
private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index d0654bd..7b8eb92 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.*;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,7 +29,13 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
+import org.apache.cassandra.repair.asymmetric.HostDifferences;
+import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
+import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -45,6 +53,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
private final ListeningExecutorService taskExecutor;
private final boolean isIncremental;
private final PreviewKind previewKind;
+ private final boolean optimiseStreams;
/**
* Create repair job to run on specific columnfamily
@@ -52,7 +61,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
*/
- public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind)
+ public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams)
{
this.session = session;
this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
@@ -60,6 +69,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
this.parallelismDegree = session.parallelismDegree;
this.isIncremental = isIncremental;
this.previewKind = previewKind;
+ this.optimiseStreams = optimiseStreams;
}
/**
@@ -118,39 +128,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}
// When all validations complete, submit sync tasks
- ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, new AsyncFunction<List<TreeResponse>, List<SyncStat>>()
- {
- public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees)
- {
- InetAddress local = FBUtilities.getLocalAddress();
-
- List<SyncTask> syncTasks = new ArrayList<>();
- // We need to difference all trees one against another
- for (int i = 0; i < trees.size() - 1; ++i)
- {
- TreeResponse r1 = trees.get(i);
- for (int j = i + 1; j < trees.size(); ++j)
- {
- TreeResponse r2 = trees.get(j);
- SyncTask task;
- if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
- {
- task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
- }
- else
- {
- task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
- // RemoteSyncTask expects SyncComplete message sent back.
- // Register task to RepairSession to receive response.
- session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
- }
- syncTasks.add(task);
- taskExecutor.submit(task);
- }
- }
- return Futures.allAsList(syncTasks);
- }
- }, taskExecutor);
+ ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor);
// When all sync complete, set the final result
Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
@@ -182,6 +160,97 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}, taskExecutor);
}
+ private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing()
+ {
+ return trees ->
+ {
+ InetAddress local = FBUtilities.getLocalAddress();
+
+ List<SyncTask> syncTasks = new ArrayList<>();
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ SyncTask task;
+ if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
+ {
+ task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+ }
+ else
+ {
+ task = new RemoteSyncTask(desc, r1, r2, session.previewKind);
+ // RemoteSyncTask expects SyncComplete message sent back.
+ // Register task to RepairSession to receive response.
+ session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+ }
+ syncTasks.add(task);
+ taskExecutor.submit(task);
+ }
+ }
+ return Futures.allAsList(syncTasks);
+ };
+ }
+
+ private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing()
+ {
+ return trees ->
+ {
+ InetAddress local = FBUtilities.getLocalAddress();
+
+ List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
+ // We need to difference all trees one against another
+ DifferenceHolder diffHolder = new DifferenceHolder(trees);
+
+ logger.debug("diffs = {}", diffHolder);
+ PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
+ candidates.stream()
+ .filter(node -> getDC(streaming)
+ .equals(getDC(node)))
+ .collect(Collectors.toSet());
+ ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+
+ for (int i = 0; i < trees.size(); i++)
+ {
+ InetAddress address = trees.get(i).endpoint;
+ HostDifferences streamsFor = reducedDifferences.get(address);
+ if (streamsFor != null)
+ {
+ assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
+ for (InetAddress fetchFrom : streamsFor.hosts())
+ {
+ List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+ logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
+ AsymmetricSyncTask task;
+ if (address.equals(local))
+ {
+ task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind);
+ }
+ else
+ {
+ task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
+ session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task);
+ }
+ syncTasks.add(task);
+ taskExecutor.submit(task);
+ }
+ }
+ else
+ {
+ logger.debug("Node {} has nothing to stream", address);
+ }
+ }
+ return Futures.allAsList(syncTasks);
+ };
+ }
+
+ private String getDC(InetAddress address)
+ {
+ return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
+ }
+
/**
* Creates {@link ValidationTask} and submit them to task executor in parallel.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 3c7f890..c26d4d1 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -144,10 +144,32 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
// forwarded sync request
SyncRequest request = (SyncRequest) message.payload;
logger.debug("Syncing {}", request);
- StreamingRepairTask task = new StreamingRepairTask(desc, request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind);
+ StreamingRepairTask task = new StreamingRepairTask(desc,
+ request.initiator,
+ request.src,
+ request.dst,
+ request.ranges,
+ isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
+ request.previewKind,
+ false);
task.run();
break;
+ case ASYMMETRIC_SYNC_REQUEST:
+ // forwarded sync request
+ AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
+ logger.debug("Syncing {}", asymmetricSyncRequest);
+ StreamingRepairTask asymmetricTask = new StreamingRepairTask(desc,
+ asymmetricSyncRequest.initiator,
+ asymmetricSyncRequest.fetchingNode,
+ asymmetricSyncRequest.fetchFrom,
+ asymmetricSyncRequest.ranges,
+ isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
+ asymmetricSyncRequest.previewKind,
+ true);
+ asymmetricTask.run();
+ break;
+
case CLEANUP:
logger.debug("cleaning up repair");
CleanupMessage cleanup = (CleanupMessage) message.payload;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 2b67a3c..1c9778b 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -515,6 +515,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
options.isPullRepair(),
force,
options.getPreviewKind(),
+ options.optimiseStreams(),
executor,
cfnames);
if (session == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 5dbd050..609ec56 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -104,10 +104,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>();
// Remote syncing jobs wait response in syncingTasks map
- private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
// Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+ private final boolean optimiseStreams;
private volatile boolean terminated = false;
@@ -134,6 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
boolean pullRepair,
boolean force,
PreviewKind previewKind,
+ boolean optimiseStreams,
String... cfnames)
{
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -174,6 +176,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.previewKind = previewKind;
this.pullRepair = pullRepair;
this.skippedReplicas = forceSkippedReplicas;
+ this.optimiseStreams = optimiseStreams;
}
public UUID getId()
@@ -191,11 +194,12 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
validating.put(key, task);
}
- public void waitForSync(Pair<RepairJobDesc, NodePair> key, RemoteSyncTask task)
+ public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task)
{
syncingTasks.put(key, task);
}
+
/**
* Receive merkle tree response or failed response from {@code endpoint} for current repair job.
*
@@ -227,7 +231,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
*/
public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
{
- RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
+ CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
if (task == null)
{
assert terminated;
@@ -301,7 +305,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length);
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind);
+ RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind, optimiseStreams);
executor.execute(job);
jobs.add(job);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f43010b..a1b7459 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -20,15 +20,17 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.UUID;
import java.util.Collections;
+import java.util.Collection;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
@@ -45,34 +47,44 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
private final RepairJobDesc desc;
- private final SyncRequest request;
+ private final boolean asymmetric;
+ private final InetAddress initiator;
+ private final InetAddress src;
+ private final InetAddress dst;
+ private final Collection<Range<Token>> ranges;
private final UUID pendingRepair;
private final PreviewKind previewKind;
- public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair, PreviewKind previewKind)
+ public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
{
this.desc = desc;
- this.request = request;
+ this.initiator = initiator;
+ this.src = src;
+ this.dst = dst;
+ this.ranges = ranges;
+ this.asymmetric = asymmetric;
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
}
public void run()
{
- InetAddress dest = request.dst;
+ InetAddress dest = dst;
InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
- logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, request.ranges.size(), request.dst);
+ logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
createStreamPlan(dest, preferred).execute();
}
@VisibleForTesting
StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
{
- return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+ StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
- .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node
- .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily); // send ranges to the remote node
+ .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node
+ if (!asymmetric)
+ sp.transferRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node
+ return sp;
}
public void handleStreamEvent(StreamEvent event)
@@ -86,8 +98,8 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
*/
public void onSuccess(StreamState state)
{
- logger.info("{} streaming task succeed, returning response to {}", previewKind.logPrefix(desc.sessionId), request.initiator);
- MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true, state.createSummaries()).createMessage(), request.initiator);
+ logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, initiator);
+ MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, true, state.createSummaries()).createMessage(), initiator);
}
/**
@@ -95,6 +107,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
*/
public void onFailure(Throwable t)
{
- MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false, Collections.emptyList()).createMessage(), request.initiator);
+ MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, false, Collections.emptyList()).createMessage(), initiator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
new file mode 100644
index 0000000..eb99977
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.TreeResponse;
+import org.apache.cassandra.utils.MerkleTrees;
+
+/**
+ * Just holds all differences between the hosts involved in a repair
+ */
+public class DifferenceHolder
+{
+ private final ImmutableMap<InetAddress, HostDifferences> differences;
+
+ public DifferenceHolder(List<TreeResponse> trees)
+ {
+ ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ // create the differences between r1 and all other hosts:
+ HostDifferences hd = new HostDifferences();
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ hd.add(r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees));
+ }
+ // and add them to the diff map
+ diffBuilder.put(r1.endpoint, hd);
+ }
+ differences = diffBuilder.build();
+ }
+
+ @VisibleForTesting
+ DifferenceHolder(Map<InetAddress, HostDifferences> differences)
+ {
+ ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+ diffBuilder.putAll(differences);
+ this.differences = diffBuilder.build();
+ }
+
+ /**
+ * differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map
+ */
+ public Set<InetAddress> keyHosts()
+ {
+ return differences.keySet();
+ }
+
+ public HostDifferences get(InetAddress hostWithDifference)
+ {
+ return differences.get(hostWithDifference);
+ }
+
+ public String toString()
+ {
+ return "DifferenceHolder{" +
+ "differences=" + differences +
+ '}';
+ }
+
+ public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+ {
+ HostDifferences diffsNode1 = differences.get(node1);
+ if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range))
+ return true;
+ HostDifferences diffsNode2 = differences.get(node2);
+ if (diffsNode2 != null && diffsNode2.hasDifferencesFor(node1, range))
+ return true;
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
new file mode 100644
index 0000000..6cbe987
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Tracks the differences for a single host
+ */
+public class HostDifferences
+{
+ private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>();
+
+ /**
+ * Adds a set of differences between the node this instance is tracking and endpoint
+ */
+ public void add(InetAddress endpoint, List<Range<Token>> difference)
+ {
+ perHostDifferences.put(endpoint, difference);
+ }
+
+ public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch)
+ {
+ perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
+ }
+
+ /**
+ * Does this instance have differences for range with node2?
+ */
+ public boolean hasDifferencesFor(InetAddress node2, Range<Token> range)
+ {
+ List<Range<Token>> differences = get(node2);
+ for (Range<Token> diff : differences)
+ {
+ // if the other node has a diff for this range, we know they are not equal.
+ if (range.equals(diff) || range.intersects(diff))
+ return true;
+ }
+ return false;
+ }
+
+ public Set<InetAddress> hosts()
+ {
+ return perHostDifferences.keySet();
+ }
+
+ public List<Range<Token>> get(InetAddress differingHost)
+ {
+ return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
+ }
+
+ public String toString()
+ {
+ return "HostDifferences{" +
+ "perHostDifferences=" + perHostDifferences +
+ '}';
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
new file mode 100644
index 0000000..b41ddd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Tracks incoming streams for a single host
+ */
+public class IncomingRepairStreamTracker
+{
+ private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
+ private final DifferenceHolder differences;
+ private final Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+
+ public IncomingRepairStreamTracker(DifferenceHolder differences)
+ {
+ this.differences = differences;
+ }
+
+ public String toString()
+ {
+ return "IncomingStreamTracker{" +
+ "incoming=" + incoming +
+ '}';
+ }
+
+ /**
+ * Adds a range to be streamed from streamFromNode
+ *
+ * First the currently tracked ranges are denormalized to make sure that no ranges overlap, then
+ * the streamFromNode is added to the StreamFromOptions for the range
+ *
+ * @param range the range we need to stream from streamFromNode
+ * @param streamFromNode the node we should stream from
+ */
+ public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode)
+ {
+ logger.trace("adding incoming range {} from {}", range, streamFromNode);
+ Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);
+ for (Range<Token> input : newInput)
+ {
+ incoming.computeIfAbsent(input, (newRange) -> new StreamFromOptions(differences, newRange)).add(streamFromNode);
+ }
+ }
+
+ public ImmutableMap<Range<Token>, StreamFromOptions> getIncoming()
+ {
+ return ImmutableMap.copyOf(incoming);
+ }
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
new file mode 100644
index 0000000..90788dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Set;
+
+public interface PreferedNodeFilter
+{
+ public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
new file mode 100644
index 0000000..a04d6d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class RangeDenormalizer
+{
+ private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
+
+ /**
+ * "Denormalizes" (kind of the opposite of what Range.normalize does) the ranges in the keys of {{incoming}}
+ *
+ * It makes sure that if there is an intersection between {{range}} and some of the ranges in {{incoming.keySet()}}
+ * we know that all intersections are keys in the updated {{incoming}}
+ */
+ public static Set<Range<Token>> denormalize(Range<Token> range, Map<Range<Token>, StreamFromOptions> incoming)
+ {
+ logger.trace("Denormalizing range={} incoming={}", range, incoming);
+ Set<Range<Token>> existingRanges = new HashSet<>(incoming.keySet());
+ Map<Range<Token>, StreamFromOptions> existingOverlappingRanges = new HashMap<>();
+ // remove all overlapping ranges from the incoming map
+ for (Range<Token> existingRange : existingRanges)
+ {
+ if (range.intersects(existingRange))
+ existingOverlappingRanges.put(existingRange, incoming.remove(existingRange));
+ }
+
+ Set<Range<Token>> intersections = intersection(existingRanges, range);
+ Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges.keySet(), range), intersections);
+ Set<Range<Token>> newInput = Sets.union(range.subtractAll(existingOverlappingRanges.keySet()), intersections);
+ assertNonOverLapping(newExisting);
+ assertNonOverLapping(newInput);
+ for (Range<Token> r : newExisting)
+ {
+ for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges.entrySet())
+ {
+ if (r.intersects(entry.getKey()))
+ incoming.put(r, entry.getValue().copy(r));
+ }
+ }
+ logger.trace("denormalized {} to {}", range, newInput);
+ logger.trace("denormalized incoming to {}", incoming);
+ assertNonOverLapping(incoming.keySet());
+ return newInput;
+ }
+
+ /**
+ * Subtract the given range from all the input ranges.
+ *
+ * for example:
+ * ranges = [(0, 10], (20, 30]]
+ * and range = (8, 22]
+ *
+ * the result should be [(0, 8], (22, 30]]
+ *
+ */
+ @VisibleForTesting
+ static Set<Range<Token>> subtractFromAllRanges(Collection<Range<Token>> ranges, Range<Token> range)
+ {
+ Set<Range<Token>> result = new HashSet<>();
+ for (Range<Token> r : ranges)
+ result.addAll(r.subtract(range)); // subtract can return two ranges if we remove the middle part
+ return result;
+ }
+
+ /**
+ * Makes sure non of the input ranges are overlapping
+ */
+ private static void assertNonOverLapping(Set<Range<Token>> ranges)
+ {
+ List<Range<Token>> sortedRanges = Range.sort(ranges);
+ Token lastToken = null;
+ for (Range<Token> range : sortedRanges)
+ {
+ if (lastToken != null && lastToken.compareTo(range.left) > 0)
+ {
+ throw new AssertionError("Ranges are overlapping: "+ranges);
+ }
+ lastToken = range.right;
+ }
+ }
+
+ /**
+ * Returns all intersections between the ranges in ranges and the given range
+ */
+ private static Set<Range<Token>> intersection(Collection<Range<Token>> ranges, Range<Token> range)
+ {
+ Set<Range<Token>> result = new HashSet<>();
+ for (Range<Token> r : ranges)
+ result.addAll(range.intersectionWith(r));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
new file mode 100644
index 0000000..ce05e93
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes
+ *
+ * Say node X has tracked that it will stream range r1 from node Y. Now we see find a diffing range
+ * r1 between node X and Z. When adding r1 from Z as an incoming to X we check if Y and Z are equal on range r (ie, there is
+ * no difference between them). If they are equal X can stream from Y or Z and the end result will be the same.
+ *
+ * The ranges wont match perfectly since we don't iterate over leaves so we always split based on the
+ * smallest range (either the new difference or the existing one)
+ */
+public class ReduceHelper
+{
+ /**
+ * Reduces the differences provided by the merkle trees to a minimum set of differences
+ */
+ public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
+ {
+ Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
+ Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>();
+ ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder();
+ for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
+ {
+ IncomingRepairStreamTracker tracker = trackerEntry.getValue();
+ HostDifferences rangesToFetch = new HostDifferences();
+ for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
+ {
+ Range<Token> rangeToFetch = entry.getKey();
+ for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+ rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
+ }
+ mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
+ }
+
+ return mapBuilder.build();
+ }
+
+ @VisibleForTesting
+ static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
+ {
+ Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>();
+
+ for (InetAddress hostWithDifference : differences.keyHosts())
+ {
+ HostDifferences hostDifferences = differences.get(hostWithDifference);
+ for (InetAddress differingHost : hostDifferences.hosts())
+ {
+ List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
+ // hostWithDifference has mismatching ranges differingRanges with differingHost:
+ for (Range<Token> range : differingRanges)
+ {
+ // a difference means that we need to sync that range between two nodes - add the diffing range to both
+ // hosts:
+ getTracker(differences, trackers, hostWithDifference).addIncomingRangeFrom(range, differingHost);
+ getTracker(differences, trackers, differingHost).addIncomingRangeFrom(range, hostWithDifference);
+ }
+ }
+ }
+ return trackers;
+ }
+
+ private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences,
+ Map<InetAddress, IncomingRepairStreamTracker> trackers,
+ InetAddress host)
+ {
+ return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
+ }
+
+ // greedily pick the nodes doing the least amount of streaming
+ private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode,
+ StreamFromOptions toStreamFrom,
+ Map<InetAddress, Integer> outgoingStreamCounts,
+ PreferedNodeFilter filter)
+ {
+ Set<InetAddress> retSet = new HashSet<>();
+ for (Set<InetAddress> toStream : toStreamFrom.allStreams())
+ {
+ InetAddress candidate = null;
+ Set<InetAddress> prefered = filter.apply(streamingNode, toStream);
+ for (InetAddress node : prefered)
+ {
+ if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
+ {
+ candidate = node;
+ }
+ }
+ // ok, found no prefered hosts, try all of them
+ if (candidate == null)
+ {
+ for (InetAddress node : toStream)
+ {
+ if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
+ {
+ candidate = node;
+ }
+ }
+ }
+ assert candidate != null;
+ outgoingStreamCounts.put(candidate, outgoingStreamCounts.getOrDefault(candidate, 0) + 1);
+ retSet.add(candidate);
+ }
+ return retSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
new file mode 100644
index 0000000..4516f23
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Keeps track of where a node needs to stream a given range from.
+ *
+ * If the remote range is identical on several remote nodes, this class keeps track of them
+ *
+ * These stream from options get 'split' during denormalization - for example if we track range
+ * (100, 200] and we find a new differing range (180, 200] - then the denormalization will create two
+ * new StreamFromOptions (see copy below) with the same streamOptions, one with range (100, 180] and one with (180, 200] - then it
+ * adds the new incoming difference to the StreamFromOptions for the new range (180, 200].
+ */
+public class StreamFromOptions
+{
+ /**
+ * all differences - used to figure out if two nodes are equals on the range
+ */
+ private final DifferenceHolder differences;
+ /**
+ * The range to stream
+ */
+ @VisibleForTesting
+ final Range<Token> range;
+ /**
+ * Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling
+ */
+ private final Set<Set<InetAddress>> streamOptions = new HashSet<>();
+
+ public StreamFromOptions(DifferenceHolder differences, Range<Token> range)
+ {
+ this(differences, range, Collections.emptySet());
+ }
+
+ private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing)
+ {
+ this.differences = differences;
+ this.range = range;
+ for (Set<InetAddress> addresses : existing)
+ this.streamOptions.add(Sets.newHashSet(addresses));
+ }
+
+ /**
+ * Add new node to the stream options
+ *
+ * If we have no difference between the new node and a currently tracked on, we know they are matching over the
+ * range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group
+ * of nodes containing this new node.
+ */
+ public void add(InetAddress streamFromNode)
+ {
+ for (Set<InetAddress> options : streamOptions)
+ {
+ InetAddress first = options.iterator().next();
+ if (!differences.hasDifferenceBetween(first, streamFromNode, range))
+ {
+ options.add(streamFromNode);
+ return;
+ }
+ }
+ streamOptions.add(Sets.newHashSet(streamFromNode));
+ }
+
+ public StreamFromOptions copy(Range<Token> withRange)
+ {
+ return new StreamFromOptions(differences, withRange, streamOptions);
+ }
+
+ public Iterable<Set<InetAddress>> allStreams()
+ {
+ return streamOptions;
+ }
+
+ public String toString()
+ {
+ return "StreamFromOptions{" +
+ ", range=" + range +
+ ", streamOptions=" + streamOptions +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
new file mode 100644
index 0000000..b75ad7f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.PreviewKind;
+
+public class AsymmetricSyncRequest extends RepairMessage
+{
+ public static MessageSerializer serializer = new SyncRequestSerializer();
+
+ public final InetAddress initiator;
+ public final InetAddress fetchingNode;
+ public final InetAddress fetchFrom;
+ public final Collection<Range<Token>> ranges;
+ public final PreviewKind previewKind;
+
+ public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
+ {
+ super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
+ this.initiator = initiator;
+ this.fetchingNode = fetchingNode;
+ this.fetchFrom = fetchFrom;
+ this.ranges = ranges;
+ this.previewKind = previewKind;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof AsymmetricSyncRequest))
+ return false;
+ AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
+ return messageType == req.messageType &&
+ desc.equals(req.desc) &&
+ initiator.equals(req.initiator) &&
+ fetchingNode.equals(req.fetchingNode) &&
+ fetchFrom.equals(req.fetchFrom) &&
+ ranges.equals(req.ranges);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(messageType, desc, initiator, fetchingNode, fetchFrom, ranges);
+ }
+
+ public static class SyncRequestSerializer implements MessageSerializer<AsymmetricSyncRequest>
+ {
+ public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
+ {
+ RepairJobDesc.serializer.serialize(message.desc, out, version);
+ CompactEndpointSerializationHelper.serialize(message.initiator, out);
+ CompactEndpointSerializationHelper.serialize(message.fetchingNode, out);
+ CompactEndpointSerializationHelper.serialize(message.fetchFrom, out);
+ out.writeInt(message.ranges.size());
+ for (Range<Token> range : message.ranges)
+ {
+ MessagingService.validatePartitioner(range);
+ AbstractBounds.tokenSerializer.serialize(range, out, version);
+ }
+ out.writeInt(message.previewKind.getSerializationVal());
+ }
+
+ public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
+ {
+ RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+ InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+ int rangesCount = in.readInt();
+ List<Range<Token>> ranges = new ArrayList<>(rangesCount);
+ for (int i = 0; i < rangesCount; ++i)
+ ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
+ PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+ return new AsymmetricSyncRequest(desc, owner, src, dst, ranges, previewKind);
+ }
+
+ public long serializedSize(AsymmetricSyncRequest message, int version)
+ {
+ long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
+ size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+ size += TypeSizes.sizeof(message.ranges.size());
+ for (Range<Token> range : message.ranges)
+ size += AbstractBounds.tokenSerializer.serializedSize(range, version);
+ size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+ return size;
+ }
+ }
+
+ public String toString()
+ {
+ return "AsymmetricSyncRequest{" +
+ "initiator=" + initiator +
+ ", fetchingNode=" + fetchingNode +
+ ", fetchFrom=" + fetchFrom +
+ ", ranges=" + ranges +
+ ", previewKind=" + previewKind +
+ ", desc="+desc+
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index b72f139..09c6060 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -60,7 +60,8 @@ public abstract class RepairMessage
FINALIZE_COMMIT(12, FinalizeCommit.serializer),
FAILED_SESSION(13, FailSession.serializer),
STATUS_REQUEST(14, StatusRequest.serializer),
- STATUS_RESPONSE(15, StatusResponse.serializer);
+ STATUS_RESPONSE(15, StatusResponse.serializer),
+ ASYMMETRIC_SYNC_REQUEST(16, AsymmetricSyncRequest.serializer);
private final byte type;
private final MessageSerializer<RepairMessage> serializer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 971bf5d..adcd776 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -50,6 +50,7 @@ public class RepairOption
public static final String PULL_REPAIR_KEY = "pullRepair";
public static final String FORCE_REPAIR_KEY = "forceRepair";
public static final String PREVIEW = "previewKind";
+ public static final String OPTIMISE_STREAMS_KEY = "optimiseStreams";
// we don't want to push nodes too much for repair
public static final int MAX_JOB_THREADS = 4;
@@ -131,6 +132,12 @@ public class RepairOption
* <td>"true" if the repair should continue, even if one of the replicas involved is down.
* <td>false</td>
* </tr>
+ * <tr>
+ * <td>optimiseStreams</td>
+ * <td>"true" if we should try to optimise the syncing to avoid transfering identical
+ * ranges to the same host multiple times</td>
+ * <td>false</td>
+ * </tr>
* </tbody>
* </table>
*
@@ -180,8 +187,9 @@ public class RepairOption
ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
}
}
+ boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind);
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing);
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -259,13 +267,14 @@ public class RepairOption
private final boolean pullRepair;
private final boolean forceRepair;
private final PreviewKind previewKind;
+ private final boolean optimiseStreams;
private final Collection<String> columnFamilies = new HashSet<>();
private final Collection<String> dataCenters = new HashSet<>();
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams)
{
if (FBUtilities.isWindows &&
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -286,6 +295,7 @@ public class RepairOption
this.pullRepair = pullRepair;
this.forceRepair = forceRepair;
this.previewKind = previewKind;
+ this.optimiseStreams = optimiseStreams;
}
public RepairParallelism getParallelism()
@@ -363,10 +373,16 @@ public class RepairOption
return previewKind.isPreview();
}
- public boolean isInLocalDCOnly() {
+ public boolean isInLocalDCOnly()
+ {
return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter());
}
+ public boolean optimiseStreams()
+ {
+ return optimiseStreams;
+ }
+
@Override
public String toString()
{
@@ -382,6 +398,7 @@ public class RepairOption
", # of ranges: " + ranges.size() +
", pull repair: " + pullRepair +
", force repair: " + forceRepair +
+ ", optimise streams: "+ optimiseStreams +
')';
}
@@ -401,6 +418,7 @@ public class RepairOption
options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair));
options.put(PREVIEW, previewKind.toString());
+ options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
return options;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index ef3ffeb..0276238 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -213,6 +213,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
boolean pullRepair,
boolean force,
PreviewKind previewKind,
+ boolean optimiseStreams,
ListeningExecutorService executor,
String... cfnames)
{
@@ -222,7 +223,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (cfnames.length == 0)
return null;
- final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, cfnames);
+
+ final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
sessions.put(session.getId(), session);
// register listeners
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 86c29d4..8347afc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -94,6 +94,10 @@ public class Repair extends NodeToolCmd
@Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
private boolean pullRepair = false;
+ @Option(title = "optimise_streams", name = {"-os", "--optimise-streams"}, description = "Use --optimise-streams to try to reduce the number of streams we do (EXPERIMENTAL, see CASSANDRA-3200).")
+ private boolean optimiseStreams = false;
+
+
private PreviewKind getPreviewKind()
{
if (validate)
@@ -144,7 +148,7 @@ public class Repair extends NodeToolCmd
options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force));
options.put(RepairOption.PREVIEW, getPreviewKind().toString());
-
+ options.put(RepairOption.OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams));
if (!startToken.isEmpty() || !endToken.isEmpty())
{
options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index f5e9d6b..7f3dbff 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -46,7 +47,6 @@ import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.UUIDGen;
-import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.junit.Assert.assertEquals;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org