You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/06/15 09:48:50 UTC
[04/10] cassandra git commit: Cache local ranges when calculating
repair neighbors
Cache local ranges when calculating repair neighbors
patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f74831f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f74831f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f74831f
Branch: refs/heads/trunk
Commit: 2f74831f4218142f6e118678a3c74c79c1f7e1ed
Parents: 1d2d074
Author: Mahdi Mohammadi <ma...@gmail.com>
Authored: Wed Jun 15 11:43:27 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 15 11:43:27 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/ActiveRepairService.java | 8 +++++---
.../org/apache/cassandra/service/StorageService.java | 6 +++++-
.../service/AntiEntropyServiceTestAbstract.java | 14 ++++++++------
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d70902..ec2b48e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
* Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
* Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
* cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f8975f9..4c83c48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -164,7 +164,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
Set<InetAddress> neighbours = new HashSet<>();
- neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
+ Collection<Range<Token>> keyspaceLocalRanges = StorageService.instance.getLocalRanges(desc.keyspace);
+ neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, keyspaceLocalRanges, desc.range, null, null));
RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
@@ -176,17 +177,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* Return all of the neighbors with whom we share the provided range.
*
* @param keyspaceName keyspace to repair
+ * @param keyspaceLocalRanges local-range for given keyspaceName
* @param toRepair token to repair
* @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
- for (Range<Token> range : ss.getLocalRanges(keyspaceName))
+ for (Range<Token> range : keyspaceLocalRanges)
{
if (range.contains(toRepair))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eea4556..27939f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2978,13 +2978,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
}
+ //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
+ //calculation multiple times
+ Collection<Range<Token>> keyspaceLocalRanges = getLocalRanges(keyspace);
+
Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
try
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, dataCenters, hosts);
rangeToNeighbors.put(range, neighbors);
allNeighbors.addAll(neighbors);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f74831f/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index ac39de6..21eb492 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -123,7 +123,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -146,7 +146,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -168,7 +168,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -196,7 +196,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -218,7 +218,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
expected.remove(FBUtilities.getBroadcastAddress());
Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
- assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts).iterator().next());
}
@Test(expected = IllegalArgumentException.class)
@@ -227,7 +228,8 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
//Dont give local endpoint
Collection<String> hosts = Arrays.asList("127.0.0.3");
- ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspaceName);
+ ActiveRepairService.getNeighbors(keyspaceName, ranges, ranges.iterator().next(), null, hosts);
}
Set<InetAddress> addTokens(int max) throws Throwable