You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/12/01 11:42:17 UTC
[cassandra] branch trunk updated: Optimised repair streaming
improvements
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22abff7 Optimised repair streaming improvements
22abff7 is described below
commit 22abff779df097e0ef38180442e9c680b3d41187
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Fri Nov 13 09:47:38 2020 +0100
Optimised repair streaming improvements
Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-16274
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 5 +
.../cassandra/config/DatabaseDescriptor.java | 38 +++
src/java/org/apache/cassandra/net/Verb.java | 2 -
.../cassandra/repair/AsymmetricRemoteSyncTask.java | 12 +-
.../org/apache/cassandra/repair/RepairJob.java | 3 +-
.../cassandra/repair/RepairMessageVerbHandler.java | 17 +-
.../cassandra/repair/StreamingRepairTask.java | 2 +-
.../cassandra/repair/SymmetricRemoteSyncTask.java | 2 +-
.../repair/asymmetric/HostDifferences.java | 45 +++-
.../asymmetric/IncomingRepairStreamTracker.java | 6 +-
.../repair/asymmetric/RangeDenormalizer.java | 74 +++---
.../cassandra/repair/asymmetric/RangeMap.java | 269 +++++++++++++++++++++
.../cassandra/repair/asymmetric/ReduceHelper.java | 67 ++---
.../repair/messages/AsymmetricSyncRequest.java | 133 ----------
.../cassandra/repair/messages/RepairOption.java | 19 +-
.../cassandra/repair/messages/SyncRequest.java | 19 +-
.../apache/cassandra/service/StorageService.java | 30 +++
.../cassandra/service/StorageServiceMBean.java | 8 +
.../data/serialization/4.0/service.SyncRequest.bin | Bin 110 -> 111 bytes
.../test/OptimiseStreamsRepairTest.java | 195 +++++++++++++++
.../org/apache/cassandra/repair/RepairJobTest.java | 55 +++--
.../cassandra/repair/StreamingRepairTaskTest.java | 4 +-
.../repair/asymmetric/RangeDenormalizerTest.java | 8 +-
.../cassandra/repair/asymmetric/RangeMapTest.java | 106 ++++++++
.../repair/asymmetric/ReduceHelperTest.java | 44 +++-
.../messages/RepairMessageSerializationsTest.java | 2 +-
.../cassandra/service/SerializationsTest.java | 3 +-
28 files changed, 872 insertions(+), 297 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 56843d7..8e19522 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta4
+ * Optimised repair streaming improvements (CASSANDRA-16274)
* Update jctools dependency to 3.1.0 (CASSANDRA-16255)
* 'SSLEngine closed already' exception on failed outbound connection (CASSANDRA-16277)
* Drain and/or shutdown might throw because of slow messaging service shutdown (CASSANDRA-16276)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 464f8ad..fbc7f98 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -533,6 +533,11 @@ public class Config
public boolean autocompaction_on_startup_enabled = Boolean.parseBoolean(System.getProperty("cassandra.autocompaction_on_startup_enabled", "true"));
+ // see CASSANDRA-3200 / CASSANDRA-16274
+ public volatile boolean auto_optimise_inc_repair_streams = false;
+ public volatile boolean auto_optimise_full_repair_streams = false;
+ public volatile boolean auto_optimise_preview_repair_streams = false;
+
/**
* Client mode means that the process is a pure client, that uses C* code base but does
* not read or write local C* database files.
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 30a30e5..d559d63 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3222,4 +3222,42 @@ public class DatabaseDescriptor
{
return conf.autocompaction_on_startup_enabled;
}
+
+ public static boolean autoOptimiseIncRepairStreams()
+ {
+ return conf.auto_optimise_inc_repair_streams;
+ }
+
+ public static void setAutoOptimiseIncRepairStreams(boolean enabled)
+ {
+ if (enabled != conf.auto_optimise_inc_repair_streams)
+ logger.info("Changing auto_optimise_inc_repair_streams from {} to {}", conf.auto_optimise_inc_repair_streams, enabled);
+ conf.auto_optimise_inc_repair_streams = enabled;
+ }
+
+ public static boolean autoOptimiseFullRepairStreams()
+ {
+ return conf.auto_optimise_full_repair_streams;
+ }
+
+ public static void setAutoOptimiseFullRepairStreams(boolean enabled)
+ {
+ if (enabled != conf.auto_optimise_full_repair_streams)
+ logger.info("Changing auto_optimise_full_repair_streams from {} to {}", conf.auto_optimise_full_repair_streams, enabled);
+ conf.auto_optimise_full_repair_streams = enabled;
+ }
+
+ public static boolean autoOptimisePreviewRepairStreams()
+ {
+ return conf.auto_optimise_preview_repair_streams;
+ }
+
+ public static void setAutoOptimisePreviewRepairStreams(boolean enabled)
+ {
+ if (enabled != conf.auto_optimise_preview_repair_streams)
+ logger.info("Changing auto_optimise_preview_repair_streams from {} to {}", conf.auto_optimise_preview_repair_streams, enabled);
+ conf.auto_optimise_preview_repair_streams = enabled;
+ }
+
+
}
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 2ef981d..fad2fbf 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -56,7 +56,6 @@ import org.apache.cassandra.hints.HintMessage;
import org.apache.cassandra.hints.HintVerbHandler;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
import org.apache.cassandra.repair.messages.CleanupMessage;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
@@ -164,7 +163,6 @@ public enum Verb
FAILED_SESSION_MSG (113, P1, rpcTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
STATUS_RSP (115, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
STATUS_REQ (114, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
- ASYMMETRIC_SYNC_REQ (116, P1, rpcTimeout, ANTI_ENTROPY, () -> AsymmetricSyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index cf6d84b..40a1f51 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -26,16 +26,16 @@ import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.ASYMMETRIC_SYNC_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
/**
- * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
+ * AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to repair(stream)
* data with other target replica.
*
* When AsymmetricRemoteSyncTask receives SyncComplete from the target, task completes.
@@ -50,10 +50,10 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem
public void startSync()
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
- String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
+ SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, true);
+ String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
Tracing.traceRepair(message);
- MessagingService.instance().send(Message.out(ASYMMETRIC_SYNC_REQ, request), request.fetchingNode);
+ MessagingService.instance().send(Message.out(SYNC_REQ, request), request.src);
}
public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 341b0fd..825d659 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -322,7 +322,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
for (InetAddressAndPort fetchFrom : streamsFor.hosts())
{
- List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+ List<Range<Token>> toFetch = new ArrayList<>(streamsFor.get(fetchFrom));
assert !toFetch.isEmpty();
logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
@@ -347,6 +347,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}
logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)",
syncTasks.size(), trees.size(), desc.parentSessionId, System.currentTimeMillis() - startedAt);
+ logger.debug("Optimised sync tasks for {}: {}", desc.parentSessionId, syncTasks);
return syncTasks;
}
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 51518cb..2bf6d84 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -152,25 +152,10 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
request.ranges,
isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
request.previewKind,
- false);
+ request.asymmetric);
task.run();
break;
- case ASYMMETRIC_SYNC_REQ:
- // forwarded sync request
- AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
- logger.debug("Syncing {}", asymmetricSyncRequest);
- StreamingRepairTask asymmetricTask = new StreamingRepairTask(desc,
- asymmetricSyncRequest.initiator,
- asymmetricSyncRequest.fetchingNode,
- asymmetricSyncRequest.fetchFrom,
- asymmetricSyncRequest.ranges,
- isIncremental(desc.parentSessionId) ? desc.parentSessionId : null,
- asymmetricSyncRequest.previewKind,
- true);
- asymmetricTask.run();
- break;
-
case CLEANUP_MSG:
logger.debug("cleaning up repair");
CleanupMessage cleanup = (CleanupMessage) message.payload;
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 827dce3..fbfbac8 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -72,7 +72,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
public void run()
{
- logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
+ logger.info("[streaming task #{}] Performing {}streaming repair of {} ranges with {}", desc.sessionId, asymmetric ? "asymmetric " : "", ranges.size(), dst);
createStreamPlan(dst).execute();
}
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index 181554a..b4e2d9c 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -63,7 +63,7 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo
{
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
+ SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, false);
Preconditions.checkArgument(nodePair.coordinator.equals(request.src));
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
index ab294b9..8b123a7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -18,12 +18,15 @@
package org.apache.cassandra.repair.asymmetric;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
+import java.util.TreeSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -34,19 +37,22 @@ import org.apache.cassandra.locator.InetAddressAndPort;
*/
public class HostDifferences
{
- private final Map<InetAddressAndPort, List<Range<Token>>> perHostDifferences = new HashMap<>();
+ private final Map<InetAddressAndPort, NavigableSet<Range<Token>>> perHostDifferences = new HashMap<>();
+ private static final Comparator<Range<Token>> comparator = Comparator.comparing((Range<Token> o) -> o.left);
/**
* Adds a set of differences between the node this instance is tracking and endpoint
*/
- public void add(InetAddressAndPort endpoint, List<Range<Token>> difference)
+ public void add(InetAddressAndPort endpoint, Collection<Range<Token>> difference)
{
- perHostDifferences.put(endpoint, difference);
+ TreeSet<Range<Token>> sortedDiffs = new TreeSet<>(comparator);
+ sortedDiffs.addAll(difference);
+ perHostDifferences.put(endpoint, sortedDiffs);
}
public void addSingleRange(InetAddressAndPort remoteNode, Range<Token> rangeToFetch)
{
- perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
+ perHostDifferences.computeIfAbsent(remoteNode, (x) -> new TreeSet<>(comparator)).add(rangeToFetch);
}
/**
@@ -54,12 +60,25 @@ public class HostDifferences
*/
public boolean hasDifferencesFor(InetAddressAndPort node2, Range<Token> range)
{
- List<Range<Token>> differences = get(node2);
- for (Range<Token> diff : differences)
+ NavigableSet<Range<Token>> differences = get(node2);
+
+ if (differences.size() > 0 && differences.last().isWrapAround() && differences.last().intersects(range))
+ return true;
+
+ for (Range<Token> unwrappedRange : range.unwrap())
{
- // if the other node has a diff for this range, we know they are not equal.
- if (range.equals(diff) || range.intersects(diff))
- return true;
+ Range<Token> startKey = differences.floor(unwrappedRange);
+ Iterator<Range<Token>> iter = startKey == null ? differences.iterator() : differences.tailSet(startKey, true).iterator();
+
+ while (iter.hasNext())
+ {
+ Range<Token> diff = iter.next();
+ // if the other node has a diff for this range, we know they are not equal.
+ if (unwrappedRange.equals(diff) || unwrappedRange.intersects(diff))
+ return true;
+ if (unwrappedRange.right.compareTo(diff.left) < 0 && !unwrappedRange.isWrapAround())
+ break;
+ }
}
return false;
}
@@ -69,9 +88,9 @@ public class HostDifferences
return perHostDifferences.keySet();
}
- public List<Range<Token>> get(InetAddressAndPort differingHost)
+ public NavigableSet<Range<Token>> get(InetAddressAndPort differingHost)
{
- return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
+ return perHostDifferences.getOrDefault(differingHost, Collections.emptyNavigableSet());
}
public String toString()
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
index 450336f..5b4de8c 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableMap;
@@ -37,7 +35,7 @@ public class IncomingRepairStreamTracker
{
private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class);
private final DifferenceHolder differences;
- private final Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+ private final RangeMap<StreamFromOptions> incoming = new RangeMap<>();
public IncomingRepairStreamTracker(DifferenceHolder differences)
{
@@ -65,9 +63,7 @@ public class IncomingRepairStreamTracker
logger.trace("adding incoming range {} from {}", range, streamFromNode);
Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);
for (Range<Token> input : newInput)
- {
incoming.computeIfAbsent(input, (newRange) -> new StreamFromOptions(differences, newRange)).add(streamFromNode);
- }
}
public ImmutableMap<Range<Token>, StreamFromOptions> getIncoming()
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
index f692dd6..2a29871 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java
@@ -18,11 +18,9 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -33,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
public class RangeDenormalizer
@@ -45,26 +44,17 @@ public class RangeDenormalizer
* It makes sure that if there is an intersection between {{range}} and some of the ranges in {{incoming.keySet()}}
* we know that all intersections are keys in the updated {{incoming}}
*/
- public static Set<Range<Token>> denormalize(Range<Token> range, Map<Range<Token>, StreamFromOptions> incoming)
+ public static Set<Range<Token>> denormalize(Range<Token> range, RangeMap<StreamFromOptions> incoming)
{
logger.trace("Denormalizing range={} incoming={}", range, incoming);
- Set<Range<Token>> existingRanges = new HashSet<>(incoming.keySet());
- Map<Range<Token>, StreamFromOptions> existingOverlappingRanges = new HashMap<>();
- // remove all overlapping ranges from the incoming map
- for (Range<Token> existingRange : existingRanges)
- {
- if (range.intersects(existingRange))
- existingOverlappingRanges.put(existingRange, incoming.remove(existingRange));
- }
+ Set<Map.Entry<Range<Token>, StreamFromOptions>> existingOverlappingRanges = incoming.removeIntersecting(range);
- Set<Range<Token>> intersections = intersection(existingRanges, range);
- Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges.keySet(), range), intersections);
- Set<Range<Token>> newInput = Sets.union(range.subtractAll(existingOverlappingRanges.keySet()), intersections);
- assertNonOverLapping(newExisting);
- assertNonOverLapping(newInput);
+ Set<Range<Token>> intersections = intersection(existingOverlappingRanges, range);
+ Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges, range), intersections);
+ Set<Range<Token>> newInput = Sets.union(subtractAll(existingOverlappingRanges, range), intersections);
for (Range<Token> r : newExisting)
{
- for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges.entrySet())
+ for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges)
{
if (r.intersects(entry.getKey()))
incoming.put(r, entry.getValue().copy(r));
@@ -72,7 +62,6 @@ public class RangeDenormalizer
}
logger.trace("denormalized {} to {}", range, newInput);
logger.trace("denormalized incoming to {}", incoming);
- assertNonOverLapping(incoming.keySet());
return newInput;
}
@@ -87,39 +76,48 @@ public class RangeDenormalizer
*
*/
@VisibleForTesting
- static Set<Range<Token>> subtractFromAllRanges(Collection<Range<Token>> ranges, Range<Token> range)
+ static Set<Range<Token>> subtractFromAllRanges(Collection<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> range)
{
Set<Range<Token>> result = new HashSet<>();
- for (Range<Token> r : ranges)
- result.addAll(r.subtract(range)); // subtract can return two ranges if we remove the middle part
+ for (Map.Entry<Range<Token>, ?> r : ranges)
+ result.addAll(r.getKey().subtract(range)); // subtract can return two ranges if we remove the middle part
return result;
}
/**
- * Makes sure non of the input ranges are overlapping
+ * Returns all intersections between the ranges in ranges and the given range
*/
- private static void assertNonOverLapping(Set<Range<Token>> ranges)
+ private static Set<Range<Token>> intersection(Set<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> range)
{
- List<Range<Token>> sortedRanges = Range.sort(ranges);
- Token lastToken = null;
- for (Range<Token> range : sortedRanges)
- {
- if (lastToken != null && lastToken.compareTo(range.left) > 0)
- {
- throw new AssertionError("Ranges are overlapping: "+ranges);
- }
- lastToken = range.right;
- }
+ Set<Range<Token>> result = new HashSet<>();
+ for (Map.Entry<Range<Token>, StreamFromOptions> r : ranges)
+ result.addAll(range.intersectionWith(r.getKey()));
+ return result;
}
/**
- * Returns all intersections between the ranges in ranges and the given range
+ * copied from Range - need to iterate over the map entries
*/
- private static Set<Range<Token>> intersection(Collection<Range<Token>> ranges, Range<Token> range)
+ public static Set<Range<Token>> subtractAll(Collection<Map.Entry<Range<Token>, StreamFromOptions>> ranges, Range<Token> toSubtract)
{
Set<Range<Token>> result = new HashSet<>();
- for (Range<Token> r : ranges)
- result.addAll(range.intersectionWith(r));
+ result.add(toSubtract);
+ for(Map.Entry<Range<Token>, StreamFromOptions> range : ranges)
+ {
+ result = substractAllFromToken(result, range.getKey());
+ }
+
+ return result;
+ }
+
+ private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ for(Range<T> range : ranges)
+ {
+ result.addAll(range.subtract(subtract));
+ }
+
return result;
}
}
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java
new file mode 100644
index 0000000..f957b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeMap.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+public class RangeMap<T> implements Map<Range<Token>, T>
+{
+ private static final Comparator<Range<Token>> comparator = Comparator.comparing((Range<Token> o) -> o.left);
+
+ private final NavigableMap<Range<Token>, T> byStart;
+
+ public RangeMap()
+ {
+ byStart = new TreeMap<>(comparator);
+ }
+
+ public int size()
+ {
+ return byStart.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return byStart.isEmpty();
+ }
+
+ public boolean containsKey(Object key)
+ {
+ return byStart.containsKey(key);
+ }
+
+ public boolean containsValue(Object value)
+ {
+ return byStart.containsValue(value);
+ }
+
+ public T get(Object key)
+ {
+ return byStart.get(key);
+ }
+
+ public T put(Range<Token> key, T value)
+ {
+ assertNonIntersecting(key);
+ return byStart.put(key, value);
+ }
+
+ private void assertNonIntersecting(Range<Token> range)
+ {
+ // todo: wraparound
+ Range<Token> before = byStart.floorKey(range);
+ Range<Token> after = byStart.ceilingKey(range);
+ assert before == null || !before.intersects(range);
+ assert after == null || !after.intersects(range);
+ }
+
+ public T remove(Object key)
+ {
+ return byStart.remove(key);
+ }
+
+ public void putAll(Map<? extends Range<Token>, ? extends T> m)
+ {
+ byStart.putAll(m);
+ }
+
+ public void clear()
+ {
+ byStart.clear();
+ }
+
+ public Set<Range<Token>> keySet()
+ {
+ return byStart.keySet();
+ }
+
+ public Collection<T> values()
+ {
+ return byStart.values();
+ }
+
+ public Set<Map.Entry<Range<Token>, T>> entrySet()
+ {
+ return byStart.entrySet();
+ }
+
+ /**
+ * might return duplicate entries if range.isWrapAround()
+ *
+ * don't depend on the order of the entries returned
+ */
+ @VisibleForTesting
+ Iterator<Map.Entry<Range<Token>, T>> intersectingEntryIterator(Range<Token> range)
+ {
+ return range.isWrapAround() ? new WrappingIntersectingIterator(range) : new IntersectingIterator(range);
+ }
+
+ public Set<Map.Entry<Range<Token>, T>> removeIntersecting(Range<Token> range)
+ {
+ Iterator<Map.Entry<Range<Token>, T>> iter = intersectingEntryIterator(range);
+ Set<Map.Entry<Range<Token>, T>> intersecting = new HashSet<>();
+ while (iter.hasNext())
+ {
+ Map.Entry<Range<Token>, T> entry = iter.next();
+ intersecting.add(entry);
+ }
+ for (Map.Entry<Range<Token>, T> entry : intersecting)
+ byStart.remove(entry.getKey());
+ return intersecting;
+ }
+
+ private class WrappingIntersectingIterator extends AbstractIterator<Map.Entry<Range<Token>, T>>
+ {
+ private final Iterator<Iterator<Map.Entry<Range<Token>, T>>> iterators;
+ private Iterator<Map.Entry<Range<Token>, T>> currentIter;
+
+ public WrappingIntersectingIterator(Range<Token> range)
+ {
+ List<Iterator<Map.Entry<Range<Token>, T>>> iters = new ArrayList<>(2);
+ for (Range<Token> unwrapped : range.unwrap())
+ iters.add((new IntersectingIterator(unwrapped)));
+ iterators = iters.iterator();
+ }
+ protected Map.Entry<Range<Token>, T> computeNext()
+ {
+ if ((currentIter == null || !currentIter.hasNext()) && iterators.hasNext())
+ currentIter = iterators.next();
+ if (currentIter != null && currentIter.hasNext())
+ return currentIter.next();
+ return endOfData();
+ }
+ }
+
+ private class IntersectingIterator extends AbstractIterator<Map.Entry<Range<Token>, T>>
+ {
+ private final Iterator<Map.Entry<Range<Token>, T>> tailIterator;
+ private final Range<Token> range;
+ // since we guarantee no ranges overlap in byStart, we know the last entry is possibly the wrap around range
+ private boolean shouldReturnLast = false;
+
+ public IntersectingIterator(Range<Token> range)
+ {
+ Range<Token> startKey = byStart.floorKey(range);
+ tailIterator = startKey == null ? byStart.entrySet().iterator() :
+ byStart.tailMap(startKey, true).entrySet().iterator();
+ Range<Token> last = byStart.isEmpty() ? null : byStart.lastKey();
+ if (last != null && last.isWrapAround() && last.intersects(range))
+ shouldReturnLast = true;
+ this.range = range;
+ }
+
+ protected Map.Entry<Range<Token>, T> computeNext()
+ {
+ if (shouldReturnLast)
+ {
+ shouldReturnLast = false;
+ return new Entry<>(byStart.lastEntry());
+ }
+ while (tailIterator.hasNext())
+ {
+ Entry<Range<Token>, T> candidateNext = new Entry<>(tailIterator.next());
+ Range<Token> candidateRange = candidateNext.getKey();
+
+ if (candidateRange.isWrapAround()) // we know we already returned any wrapping range
+ continue;
+
+ if (candidateRange.left.compareTo(range.right) >= 0 && (!range.isWrapAround())) // range is unwrapped, but that means one range has right == min token and is still wrapping
+ return endOfData();
+
+ if (range.left.compareTo(candidateRange.right) >= 0)
+ continue;
+
+ return candidateNext;
+ }
+ return endOfData();
+ }
+ }
+
+ public String toString()
+ {
+ return byStart.toString();
+ }
+
+ static class Entry<K, V> implements Map.Entry<K, V>
+ {
+ private final V v;
+ private final K k;
+
+ Entry(K key, V val)
+ {
+ this.k = key;
+ this.v = val;
+ }
+
+ Entry(Map.Entry<K, V> toClone)
+ {
+ this(toClone.getKey(), toClone.getValue());
+ }
+ public K getKey()
+ {
+ return k;
+ }
+
+ public V getValue()
+ {
+ return v;
+ }
+
+ public V setValue(V value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof Map.Entry)) return false;
+ Map.Entry<?, ?> entry = (Map.Entry<?, ?>) o;
+ return v.equals(entry.getValue()) &&
+ k.equals(entry.getKey());
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(v, k);
+ }
+
+ public String toString()
+ {
+ return "Entry{" +
+ "v=" + v +
+ ", k=" + k +
+ '}';
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
index c7d45bf..1c4bca7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -18,7 +18,10 @@
package org.apache.cassandra.repair.asymmetric;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,7 +53,7 @@ public class ReduceHelper
public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
{
Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
- Map<InetAddressAndPort, Integer> outgoingStreamCounts = new HashMap<>();
+
ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder();
for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
{
@@ -59,7 +62,10 @@ public class ReduceHelper
for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
{
Range<Token> rangeToFetch = entry.getKey();
- for (InetAddressAndPort remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+ // StreamFromOptions contains a Set<Set<InetAddress>> with endpoints we need to stream
+ // rangeToFetch from - if the inner set size > 1 means those endpoints are identical
+ // for the range. pickConsistent picks a single endpoint from each of these sets.
+ for (InetAddressAndPort remoteNode : pickConsistent(trackerEntry.getKey(), entry.getValue(), filter))
rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
}
mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
@@ -78,7 +84,7 @@ public class ReduceHelper
HostDifferences hostDifferences = differences.get(hostWithDifference);
for (InetAddressAndPort differingHost : hostDifferences.hosts())
{
- List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
+ Iterable<Range<Token>> differingRanges = hostDifferences.get(differingHost);
// hostWithDifference has mismatching ranges differingRanges with differingHost:
for (Range<Token> range : differingRanges)
{
@@ -99,38 +105,37 @@ public class ReduceHelper
return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
}
- // greedily pick the nodes doing the least amount of streaming
- private static Collection<InetAddressAndPort> pickLeastStreaming(InetAddressAndPort streamingNode,
- StreamFromOptions toStreamFrom,
- Map<InetAddressAndPort, Integer> outgoingStreamCounts,
- PreferedNodeFilter filter)
+ private static final Comparator<InetAddressAndPort> comparator = Comparator.comparing(InetAddressAndPort::getHostAddressAndPort);
+ /**
+ * Consistently picks the node after the streaming node to stream from
+ *
+ * this is done to reduce the amount of sstables created on the receiving node
+ *
+ * todo: note that this can be improved - if we have a case like:
+ * addr3 will stream range1 from addr1 or addr2
+ * range2 from addr1
+ * in a perfect world we would stream both range1 and range2 from addr1 - but in this case we might stream
+ * range1 from addr2 depending on how the addresses sort
+ */
+ private static Collection<InetAddressAndPort> pickConsistent(InetAddressAndPort streamingNode,
+ StreamFromOptions toStreamFrom,
+ PreferedNodeFilter filter)
{
Set<InetAddressAndPort> retSet = new HashSet<>();
for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
{
- InetAddressAndPort candidate = null;
- Set<InetAddressAndPort> prefered = filter.apply(streamingNode, toStream);
- for (InetAddressAndPort node : prefered)
- {
- if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
- {
- candidate = node;
- }
- }
- // ok, found no prefered hosts, try all of them
- if (candidate == null)
- {
- for (InetAddressAndPort node : toStream)
- {
- if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
- {
- candidate = node;
- }
- }
- }
- assert candidate != null;
- outgoingStreamCounts.put(candidate, outgoingStreamCounts.getOrDefault(candidate, 0) + 1);
- retSet.add(candidate);
+ List<InetAddressAndPort> toSearch = new ArrayList<>(filter.apply(streamingNode, toStream));
+ if (toSearch.isEmpty())
+ toSearch = new ArrayList<>(toStream);
+
+ toSearch.sort(comparator);
+ int pos = Collections.binarySearch(toSearch, streamingNode, comparator);
+ assert pos < 0;
+ pos = -pos - 1;
+ if (pos == toSearch.size())
+ retSet.add(toSearch.get(0));
+ else
+ retSet.add(toSearch.get(pos));
}
return retSet;
}
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
deleted file mode 100644
index eacc285..0000000
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair.messages;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.repair.RepairJobDesc;
-import org.apache.cassandra.streaming.PreviewKind;
-
-import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
-
-public class AsymmetricSyncRequest extends RepairMessage
-{
- public final InetAddressAndPort initiator;
- public final InetAddressAndPort fetchingNode;
- public final InetAddressAndPort fetchFrom;
- public final Collection<Range<Token>> ranges;
- public final PreviewKind previewKind;
-
- public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
- {
- super(desc);
- this.initiator = initiator;
- this.fetchingNode = fetchingNode;
- this.fetchFrom = fetchFrom;
- this.ranges = ranges;
- this.previewKind = previewKind;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (!(o instanceof AsymmetricSyncRequest))
- return false;
- AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
- return desc.equals(req.desc) &&
- initiator.equals(req.initiator) &&
- fetchingNode.equals(req.fetchingNode) &&
- fetchFrom.equals(req.fetchFrom) &&
- ranges.equals(req.ranges);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(desc, initiator, fetchingNode, fetchFrom, ranges);
- }
-
- public static final IVersionedSerializer<AsymmetricSyncRequest> serializer = new IVersionedSerializer<AsymmetricSyncRequest>()
- {
- public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
- {
- RepairJobDesc.serializer.serialize(message.desc, out, version);
- inetAddressAndPortSerializer.serialize(message.initiator, out, version);
- inetAddressAndPortSerializer.serialize(message.fetchingNode, out, version);
- inetAddressAndPortSerializer.serialize(message.fetchFrom, out, version);
- out.writeInt(message.ranges.size());
- for (Range<Token> range : message.ranges)
- {
- IPartitioner.validate(range);
- AbstractBounds.tokenSerializer.serialize(range, out, version);
- }
- out.writeInt(message.previewKind.getSerializationVal());
- }
-
- public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
- {
- RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
- InetAddressAndPort owner = inetAddressAndPortSerializer.deserialize(in, version);
- InetAddressAndPort src = inetAddressAndPortSerializer.deserialize(in, version);
- InetAddressAndPort dst = inetAddressAndPortSerializer.deserialize(in, version);
- int rangesCount = in.readInt();
- List<Range<Token>> ranges = new ArrayList<>(rangesCount);
- for (int i = 0; i < rangesCount; ++i)
- ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version));
- PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
- return new AsymmetricSyncRequest(desc, owner, src, dst, ranges, previewKind);
- }
-
- public long serializedSize(AsymmetricSyncRequest message, int version)
- {
- long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
- size += inetAddressAndPortSerializer.serializedSize(message.initiator, version);
- size += inetAddressAndPortSerializer.serializedSize(message.fetchingNode, version);
- size += inetAddressAndPortSerializer.serializedSize(message.fetchFrom, version);
- size += TypeSizes.sizeof(message.ranges.size());
- for (Range<Token> range : message.ranges)
- size += AbstractBounds.tokenSerializer.serializedSize(range, version);
- size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
- return size;
- }
- };
-
- public String toString()
- {
- return "AsymmetricSyncRequest{" +
- "initiator=" + initiator +
- ", fetchingNode=" + fetchingNode +
- ", fetchFrom=" + fetchFrom +
- ", ranges=" + ranges +
- ", previewKind=" + previewKind +
- ", desc="+desc+
- '}';
- }
-}
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index a76e6c0..c2e2c18 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -390,7 +390,22 @@ public class RepairOption
public boolean optimiseStreams()
{
- return optimiseStreams;
+ if(optimiseStreams)
+ return true;
+
+ if (isPullRepair() || isForcedRepair())
+ return false;
+
+ if (isIncremental() && DatabaseDescriptor.autoOptimiseIncRepairStreams())
+ return true;
+
+ if (isPreview() && DatabaseDescriptor.autoOptimisePreviewRepairStreams())
+ return true;
+
+ if (!isIncremental() && DatabaseDescriptor.autoOptimiseFullRepairStreams())
+ return true;
+
+ return false;
}
public boolean ignoreUnreplicatedKeyspaces()
@@ -413,7 +428,7 @@ public class RepairOption
", # of ranges: " + ranges.size() +
", pull repair: " + pullRepair +
", force repair: " + forceRepair +
- ", optimise streams: "+ optimiseStreams +
+ ", optimise streams: "+ optimiseStreams() +
", ignore unreplicated keyspaces: "+ ignoreUnreplicatedKeyspaces +
')';
}
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 341455f..9886abc 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -50,8 +50,15 @@ public class SyncRequest extends RepairMessage
public final InetAddressAndPort dst;
public final Collection<Range<Token>> ranges;
public final PreviewKind previewKind;
+ public final boolean asymmetric;
- public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
+ public SyncRequest(RepairJobDesc desc,
+ InetAddressAndPort initiator,
+ InetAddressAndPort src,
+ InetAddressAndPort dst,
+ Collection<Range<Token>> ranges,
+ PreviewKind previewKind,
+ boolean asymmetric)
{
super(desc);
this.initiator = initiator;
@@ -59,6 +66,7 @@ public class SyncRequest extends RepairMessage
this.dst = dst;
this.ranges = ranges;
this.previewKind = previewKind;
+ this.asymmetric = asymmetric;
}
@Override
@@ -72,7 +80,8 @@ public class SyncRequest extends RepairMessage
src.equals(req.src) &&
dst.equals(req.dst) &&
ranges.equals(req.ranges) &&
- previewKind == req.previewKind;
+ previewKind == req.previewKind &&
+ asymmetric == req.asymmetric;
}
@Override
@@ -96,6 +105,7 @@ public class SyncRequest extends RepairMessage
AbstractBounds.tokenSerializer.serialize(range, out, version);
}
out.writeInt(message.previewKind.getSerializationVal());
+ out.writeBoolean(message.asymmetric);
}
public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
@@ -109,7 +119,8 @@ public class SyncRequest extends RepairMessage
for (int i = 0; i < rangesCount; ++i)
ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, IPartitioner.global(), version));
PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
- return new SyncRequest(desc, owner, src, dst, ranges, previewKind);
+ boolean asymmetric = in.readBoolean();
+ return new SyncRequest(desc, owner, src, dst, ranges, previewKind, asymmetric);
}
public long serializedSize(SyncRequest message, int version)
@@ -120,6 +131,7 @@ public class SyncRequest extends RepairMessage
for (Range<Token> range : message.ranges)
size += AbstractBounds.tokenSerializer.serializedSize(range, version);
size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+ size += TypeSizes.sizeof(message.asymmetric);
return size;
}
};
@@ -133,6 +145,7 @@ public class SyncRequest extends RepairMessage
", dst=" + dst +
", ranges=" + ranges +
", previewKind=" + previewKind +
+ ", asymmetric=" + asymmetric +
"} " + super.toString();
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9c6499a..8baf7fe 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5767,4 +5767,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(),
e -> e.getValue().stream().map(InetAddressAndPort::toString).collect(Collectors.toSet())));
}
+
+ public boolean autoOptimiseIncRepairStreams()
+ {
+ return DatabaseDescriptor.autoOptimiseIncRepairStreams();
+ }
+
+ public void setAutoOptimiseIncRepairStreams(boolean enabled)
+ {
+ DatabaseDescriptor.setAutoOptimiseIncRepairStreams(enabled);
+ }
+
+ public boolean autoOptimiseFullRepairStreams()
+ {
+ return DatabaseDescriptor.autoOptimiseFullRepairStreams();
+ }
+
+ public void setAutoOptimiseFullRepairStreams(boolean enabled)
+ {
+ DatabaseDescriptor.setAutoOptimiseFullRepairStreams(enabled);
+ }
+
+ public boolean autoOptimisePreviewRepairStreams()
+ {
+ return DatabaseDescriptor.autoOptimisePreviewRepairStreams();
+ }
+
+ public void setAutoOptimisePreviewRepairStreams(boolean enabled)
+ {
+ DatabaseDescriptor.setAutoOptimisePreviewRepairStreams(enabled);
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0c4b5b0..1df3146 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -826,4 +826,12 @@ public interface StorageServiceMBean extends NotificationEmitter
@Deprecated
public Map<String, Set<InetAddress>> getOutstandingSchemaVersions();
public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort();
+
+ // see CASSANDRA-3200
+ public boolean autoOptimiseIncRepairStreams();
+ public void setAutoOptimiseIncRepairStreams(boolean enabled);
+ public boolean autoOptimiseFullRepairStreams();
+ public void setAutoOptimiseFullRepairStreams(boolean enabled);
+ public boolean autoOptimisePreviewRepairStreams();
+ public void setAutoOptimisePreviewRepairStreams(boolean enabled);
}
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index b0cc44e..17bb014 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ
diff --git a/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
new file mode 100644
index 0000000..8a1c1d7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/OptimiseStreamsRepairTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.AsymmetricRemoteSyncTask;
+import org.apache.cassandra.repair.LocalSyncTask;
+import org.apache.cassandra.repair.RepairJob;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.SyncTask;
+import org.apache.cassandra.repair.TreeResponse;
+import org.apache.cassandra.streaming.PreviewKind;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OptimiseStreamsRepairTest extends TestBaseImpl
+{
+ @Test
+ public void testBasic() throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withInstanceInitializer(BBHelper::install)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false)
+ .with(GOSSIP)
+ .with(NETWORK))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+ for (int i = 0; i < 100000; i++)
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
+ cluster.forEach((i) -> i.flush(KEYSPACE));
+
+ cluster.get(2).shutdown().get();
+
+ for (int i = 0; i < 2000; i++)
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", ConsistencyLevel.QUORUM, i, i * 2 + 2);
+
+ cluster.get(2).startup();
+ Thread.sleep(10000);
+ cluster.forEach(c -> c.flush(KEYSPACE));
+ cluster.forEach(c -> c.forceCompact(KEYSPACE, "tbl"));
+
+ NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
+ res.asserts().success();
+
+ res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
+ res.asserts().success();
+ res.asserts().notificationContains("Repaired data is in sync");
+
+ res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
+ res.asserts().success();
+ res.asserts().notificationContains("Previewed data was in sync");
+ }
+ }
+
+ public static class BBHelper
+ {
+ public static void install(ClassLoader cl, int id)
+ {
+ new ByteBuddy().rebase(RepairJob.class)
+ .method(named("createOptimisedSyncingSyncTasks"))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
+ List<TreeResponse> trees,
+ InetAddressAndPort local,
+ Predicate<InetAddressAndPort> isTransient,
+ Function<InetAddressAndPort, String> getDC,
+ boolean isIncremental,
+ PreviewKind previewKind,
+ @SuperCall Callable<List<SyncTask>> zuperCall)
+ {
+ List<SyncTask> tasks = null;
+ try
+ {
+ tasks = zuperCall.call();
+ verifySyncTasks(tasks);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return tasks;
+ }
+
+ private static void verifySyncTasks(List<SyncTask> tasks) throws UnknownHostException
+ {
+ Map<InetAddressAndPort, Map<InetAddressAndPort, List<Range<Token>>>> fetching = new HashMap<>();
+ for (SyncTask task : tasks)
+ {
+ if (task instanceof LocalSyncTask)
+ {
+ assertFalse(((LocalSyncTask)task).transferRanges);
+ assertTrue(((LocalSyncTask)task).requestRanges);
+ }
+ else
+ assertTrue(task instanceof AsymmetricRemoteSyncTask);
+
+ Map<InetAddressAndPort, List<Range<Token>>> fetch = fetching.computeIfAbsent(task.nodePair().coordinator, k -> new HashMap<>());
+ fetch.computeIfAbsent(task.nodePair().peer, k -> new ArrayList<>()).addAll(task.rangesToSync);
+ }
+ // 127.0.0.2 is the node out of sync - make sure it does not receive multiple copies of the same range from the other nodes;
+ Map<InetAddressAndPort, List<Range<Token>>> node2 = fetching.get(InetAddressAndPort.getByName("127.0.0.2"));
+ Set<Range<Token>> allRanges = new HashSet<>();
+ node2.values().forEach(ranges -> ranges.forEach(r -> assertTrue(allRanges.add(r))));
+
+ // 127.0.0.2 should stream the same ranges to .1 and .3
+ Set<Range<Token>> node2ToNode1 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.1")).get(InetAddressAndPort.getByName("127.0.0.2")));
+ Set<Range<Token>> node2ToNode3 = new HashSet<>(fetching.get(InetAddressAndPort.getByName("127.0.0.3")).get(InetAddressAndPort.getByName("127.0.0.2")));
+ assertEquals(node2ToNode1, allRanges);
+ assertEquals(node2ToNode3, allRanges);
+ }
+ }
+
+ @Test
+ public void randomTest() throws IOException
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false)
+ .with(GOSSIP)
+ .with(NETWORK))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+ for (int i = 0; i < 10_000; i++)
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", ConsistencyLevel.ALL, i, i);
+ cluster.forEach((i) -> i.flush(KEYSPACE));
+
+ Random r = new Random();
+ for (int i = 0; i < 20000; i++)
+ for (int j = 1; j <= 3; j++)
+ cluster.get(j).executeInternal("INSERT INTO "+KEYSPACE+".tbl (id, t) values (?,?)", r.nextInt(), i * 2 + 2);
+
+ NodeToolResult res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-os");
+ res.asserts().success();
+
+ res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "-vd");
+ res.asserts().success();
+ res.asserts().notificationContains("Repaired data is in sync");
+
+ res = cluster.get(1).nodetoolResult("repair", KEYSPACE, "--preview", "--full");
+ res.asserts().success();
+ res.asserts().notificationContains("Previewed data was in sync");
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 9887d38..87863b5 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.utils.asserts.SyncTaskListAssert;
import static org.apache.cassandra.utils.asserts.SyncTaskAssert.assertThat;
import static org.apache.cassandra.utils.asserts.SyncTaskListAssert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
public class RepairJobTest
{
@@ -610,7 +611,7 @@ public class RepairJobTest
}
@Test
- public void testOptimizedCreateStandardSyncTasksAllDifferent()
+ public void testOptimisedCreateStandardSyncTasksAllDifferent()
{
List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "one", RANGE_2, "one", RANGE_3, "one"),
treeResponse(addr2, RANGE_1, "two", RANGE_2, "two", RANGE_3, "two"),
@@ -636,8 +637,17 @@ public class RepairJobTest
}
@Test
- public void testOptimizedCreateStandardSyncTasks()
+ public void testOptimisedCreateStandardSyncTasks()
{
+ /*
+ addr1 will stream range1 from addr3
+ range2 from addr2 or addr3
+ addr2 will stream range1 from addr3
+ range2 from addr1
+ addr3 will stream range1 from addr1 or addr2
+ range2 from addr1
+ */
+
List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "one", RANGE_2, "one"),
treeResponse(addr2, RANGE_1, "one", RANGE_2, "two"),
treeResponse(addr3, RANGE_1, "three", RANGE_2, "two"));
@@ -652,21 +662,23 @@ public class RepairJobTest
assertThat(tasks.values()).areAllInstanceOf(AsymmetricRemoteSyncTask.class);
- assertThat(tasks.get(pair(addr1, addr3)).rangesToSync).containsExactly(RANGE_1);
+ // addr1 streams range1 from addr3:
+ assertThat(tasks.get(pair(addr1, addr3)).rangesToSync).contains(RANGE_1);
// addr1 can get range2 from either addr2 or addr3 but not from both
assertStreamRangeFromEither(tasks, RANGE_2, addr1, addr2, addr3);
- assertThat(tasks.get(pair(addr2, addr3)).rangesToSync).containsExactly(RANGE_1);
- assertThat(tasks.get(pair(addr2, addr1)).rangesToSync).containsExactly(RANGE_2);
-
+ // addr2 streams range1 from addr3
+ assertThat(tasks.get(pair(addr2, addr3)).rangesToSync).contains(RANGE_1);
+ // addr2 streams range2 from addr1
+ assertThat(tasks.get(pair(addr2, addr1)).rangesToSync).contains(RANGE_2);
// addr3 can get range1 from either addr1 or addr2 but not from both
assertStreamRangeFromEither(tasks, RANGE_1, addr3, addr2, addr1);
-
- assertThat(tasks.get(pair(addr3, addr1)).rangesToSync).containsExactly(RANGE_2);
+ // addr3 streams range2 from addr1
+ assertThat(tasks.get(pair(addr3, addr1)).rangesToSync).contains(RANGE_2);
}
@Test
- public void testOptimizedCreateStandardSyncTasksWithTransient()
+ public void testOptimisedCreateStandardSyncTasksWithTransient()
{
List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, RANGE_1, "same", RANGE_2, "same", RANGE_3, "same"),
treeResponse(addr2, RANGE_1, "different", RANGE_2, "same", RANGE_3, "different"),
@@ -681,7 +693,6 @@ public class RepairJobTest
false,
PreviewKind.ALL));
- assertThat(tasks).hasSize(3);
SyncTask task = tasks.get(pair(addr1, addr2));
assertThat(task)
@@ -698,23 +709,21 @@ public class RepairJobTest
public static void assertStreamRangeFromEither(Map<SyncNodePair, SyncTask> tasks, Range<Token> range,
InetAddressAndPort target, InetAddressAndPort either, InetAddressAndPort or)
{
- InetAddressAndPort streamsFrom;
- InetAddressAndPort doesntStreamFrom;
- if (tasks.containsKey(pair(target, either)) && tasks.get(pair(target, either)).rangesToSync.contains(range))
+ SyncTask task1 = tasks.get(pair(target, either));
+ SyncTask task2 = tasks.get(pair(target, or));
+
+ boolean foundRange = false;
+ if (task1 != null && task1.rangesToSync.contains(range))
{
- streamsFrom = either;
- doesntStreamFrom = or;
+ foundRange = true;
+ assertDoesntStreamRangeFrom(range, task2);
}
- else
+ else if (task2 != null && task2.rangesToSync.contains(range))
{
- doesntStreamFrom = either;
- streamsFrom = or;
+ foundRange = true;
+ assertDoesntStreamRangeFrom(range, task1);
}
-
- SyncTask task = tasks.get(pair(target, streamsFrom));
- assertThat(task).isInstanceOf(AsymmetricRemoteSyncTask.class);
- assertThat(task.rangesToSync).containsOnly(range);
- assertDoesntStreamRangeFrom(range, tasks.get(pair(target, doesntStreamFrom)));
+ assertTrue(foundRange);
}
public static void assertDoesntStreamRangeFrom(Range<Token> range, SyncTask task)
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index ea5ebbf..1057a67 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -66,7 +66,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
- SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+ SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE, false);
StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, desc.sessionId, PreviewKind.NONE, false);
StreamPlan plan = task.createStreamPlan(request.dst);
@@ -79,7 +79,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
UUID sessionID = registerSession(cfs, false, true);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
- SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+ SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE, false);
StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, null, PreviewKind.NONE, false);
StreamPlan plan = task.createStreamPlan(request.dst);
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
index a128f2b..3d66d83 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/RangeDenormalizerTest.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -39,7 +37,7 @@ public class RangeDenormalizerTest
{
// test when the new incoming range is fully contained within an existing incoming range
StreamFromOptions dummy = new StreamFromOptions(null, range(0, 100));
- Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+ RangeMap<StreamFromOptions> incoming = new RangeMap<>();
incoming.put(range(0, 100), dummy);
Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range(30, 40), incoming);
assertEquals(3, incoming.size());
@@ -55,7 +53,7 @@ public class RangeDenormalizerTest
{
// test when the new incoming range fully contains an existing incoming range
StreamFromOptions dummy = new StreamFromOptions(null, range(40, 50));
- Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+ RangeMap<StreamFromOptions> incoming = new RangeMap<>();
incoming.put(range(40, 50), dummy);
Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range(0, 100), incoming);
assertEquals(1, incoming.size());
@@ -73,7 +71,7 @@ public class RangeDenormalizerTest
StreamFromOptions dummy = new StreamFromOptions(null, range(0, 100));
StreamFromOptions dummy2 = new StreamFromOptions(null, range(200, 300));
StreamFromOptions dummy3 = new StreamFromOptions(null, range(500, 600));
- Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>();
+ RangeMap<StreamFromOptions> incoming = new RangeMap<>();
incoming.put(range(0, 100), dummy);
incoming.put(range(200, 300), dummy2);
incoming.put(range(500, 600), dummy3);
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java
new file mode 100644
index 0000000..0805ea7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/RangeMapTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.asymmetric;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class RangeMapTest
+{
+ @Test
+ public void randomTest()
+ {
+ int iterCount = 0;
+ while (iterCount < 10000)
+ {
+ RangeMap<Integer> rangeMap = new RangeMap<>();
+ int cnt = 2000;
+ int i = 0;
+ long seed = System.currentTimeMillis();
+ Random r = new Random(seed);
+ Set<Range<Token>> randomRanges = random(cnt, r);
+ for (Range<Token> range : randomRanges)
+ rangeMap.put(range, i++);
+
+ long a = r.nextLong() % 100000;
+ long b = r.nextLong() % 100000;
+ if (a == b) b++;
+
+ Range<Token> intersectionRange = r(a, b);
+
+ Set<Map.Entry<Range<Token>, Integer>> expected = new HashSet<>();
+ for (Map.Entry<Range<Token>, Integer> entry : rangeMap.entrySet())
+ if (intersectionRange.intersects(entry.getKey()))
+ expected.add(new RangeMap.Entry<>(entry));
+
+ Set<Map.Entry<Range<Token>, Integer>> intersection = new HashSet<>(rangeMap.removeIntersecting(intersectionRange));
+
+ // no intersecting ranges left in the range map:
+ for (Map.Entry<Range<Token>, Integer> entry : rangeMap.entrySet())
+ assertFalse("seed:"+seed, intersectionRange.intersects(entry.getKey()));
+
+ assertEquals("seed:"+seed, expected, intersection);
+ if (++iterCount % 1000 == 0)
+ System.out.println(iterCount);
+ }
+ }
+
+ Set<Range<Token>> random(int cnt, Random r)
+ {
+ Set<Long> uniqueTokens = new HashSet<>(cnt * 2);
+ for (int i = 0; i < cnt * 2; i++)
+ while(!uniqueTokens.add(r.nextLong() % 1000000));
+ List<Long> randomTokens = new ArrayList<>(uniqueTokens);
+ randomTokens.sort(Long::compare);
+
+ Set<Range<Token>> ranges = new HashSet<>(cnt);
+ for (int i = 0; i < cnt - 1; i++)
+ {
+ ranges.add(r(randomTokens.get(i), randomTokens.get(i+1)));
+ i++;
+ }
+ ranges.add(r(randomTokens.get(randomTokens.size() - 1), randomTokens.get(0) - 1));
+ return ranges;
+ }
+
+ private Range<Token> r(long left, long right)
+ {
+ return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right));
+ }
+
+ @Test
+ public void testEmpty()
+ {
+ RangeMap<Integer> rmap = new RangeMap<>();
+ assertFalse(rmap.intersectingEntryIterator(r(1, 10)).hasNext());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
index 6c64b1a..01b0cae 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
@@ -22,16 +22,21 @@ package org.apache.cassandra.repair.asymmetric;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
+import java.util.TreeSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.keyvalue.AbstractMapEntry;
import org.junit.Test;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -263,30 +268,30 @@ public class ReduceHelperTest
HostDifferences n0 = reduced.get(A);
- assertTrue(n0.get(B).equals(list(range(50, 100))));
- assertTrue(n0.get(C).equals(list(range(0, 50))));
+ assertTrue(n0.get(B).equals(set(range(50, 100))));
+ assertTrue(n0.get(C).equals(set(range(0, 50))));
HostDifferences n1 = reduced.get(B);
assertEquals(0, n1.get(B).size());
if (!n1.get(A).isEmpty())
{
- assertTrue(n1.get(C).equals(list(range(0, 50))));
- assertTrue(n1.get(A).equals(list(range(50, 100))));
+ assertTrue(n1.get(C).equals(set(range(0, 50))));
+ assertTrue(n1.get(A).equals(set(range(50, 100))));
}
else
{
- assertTrue(n1.get(C).equals(list(range(0, 50), range(50, 100))));
+ assertTrue(n1.get(C).equals(set(range(0, 50), range(50, 100))));
}
HostDifferences n2 = reduced.get(C);
assertEquals(0, n2.get(C).size());
if (!n2.get(A).isEmpty())
{
- assertTrue(n2.get(A).equals(list(range(0,50))));
- assertTrue(n2.get(B).equals(list(range(50, 100))));
+ assertTrue(n2.get(A).equals(set(range(0,50))));
+ assertTrue(n2.get(B).equals(set(range(50, 100))));
}
else
{
- assertTrue(n2.get(A).equals(list(range(0, 50), range(50, 100))));
+ assertTrue(n2.get(A).equals(set(range(0, 50), range(50, 100))));
}
@@ -389,6 +394,14 @@ public class ReduceHelperTest
return ret;
}
+ @SafeVarargs
+ private static NavigableSet<Range<Token>> set(Range<Token> ... ranges)
+ {
+ NavigableSet<Range<Token>> res = new TreeSet<>(Comparator.comparing(o -> o.left));
+ res.addAll(Arrays.asList(ranges));
+ return res;
+ }
+
static Murmur3Partitioner.LongToken longtok(long l)
{
return new Murmur3Partitioner.LongToken(l);
@@ -399,19 +412,24 @@ public class ReduceHelperTest
return new Range<>(longtok(t), longtok(t2));
}
+ static Map.Entry<Range<Token>, StreamFromOptions> rangeEntry(long t, long t2)
+ {
+ return new AbstractMapEntry(range(t, t2), new StreamFromOptions(null, null)) {};
+ }
+
@Test
public void testSubtractAllRanges()
{
- Set<Range<Token>> ranges = new HashSet<>();
- ranges.add(range(10, 20)); ranges.add(range(40, 60));
+ Set<Map.Entry<Range<Token>, StreamFromOptions>> ranges = new HashSet<>();
+ ranges.add(rangeEntry(10, 20)); ranges.add(rangeEntry(40, 60));
assertEquals(0, RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)).size());
- ranges.add(range(90, 110));
+ ranges.add(rangeEntry(90, 110));
assertEquals(Sets.newHashSet(range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
- ranges.add(range(-10, 10));
+ ranges.add(rangeEntry(-10, 10));
assertEquals(Sets.newHashSet(range(-10, 0), range(100, 110)), RangeDenormalizer.subtractFromAllRanges(ranges, range(0, 100)));
}
- private void assertStreamFromEither(List<Range<Token>> r1, List<Range<Token>> r2)
+ private void assertStreamFromEither(Collection<Range<Token>> r1, Collection<Range<Token>> r2)
{
assertTrue(r1.size() > 0 ^ r2.size() > 0);
}
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index fa037a0..761a77c 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -150,7 +150,7 @@ public class RepairMessageSerializationsTest
InetAddressAndPort src = InetAddressAndPort.getByName("127.0.0.2");
InetAddressAndPort dst = InetAddressAndPort.getByName("127.0.0.3");
- SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges(), PreviewKind.NONE);
+ SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges(), PreviewKind.NONE, false);
serializeRoundTrip(msg, SyncRequest.serializer);
}
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 0a5a023..f4b3b1c 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -173,7 +173,7 @@ public class SerializationsTest extends AbstractSerializationsTester
InetAddressAndPort src = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", PORT);
InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT);
- SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE);
+ SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE, false);
testRepairMessageWrite("service.SyncRequest.bin", SyncRequest.serializer, message);
}
@@ -195,6 +195,7 @@ public class SerializationsTest extends AbstractSerializationsTester
assert src.equals(message.src);
assert dest.equals(message.dst);
assert message.ranges.size() == 1 && message.ranges.contains(FULL_RANGE);
+ assert !message.asymmetric;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org