You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/02/18 18:37:00 UTC
[3/8] git commit: Allow repairing between specific replicas patch by
Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6440
Allow repairing between specific replicas
patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6440
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f30b7720
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f30b7720
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f30b7720
Branch: refs/heads/trunk
Commit: f30b772006e43e0c2905638e1e271854f2a71f69
Parents: 500c62d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:35:53 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:35:53 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairSession.java | 8 ++--
.../cassandra/service/ActiveRepairService.java | 40 ++++++++++++++++++--
.../cassandra/service/StorageService.java | 22 +++++------
.../cassandra/service/StorageServiceMBean.java | 4 +-
.../org/apache/cassandra/tools/NodeCmd.java | 11 +++++-
.../org/apache/cassandra/tools/NodeProbe.java | 16 ++++----
.../service/AntiEntropyServiceTestAbstract.java | 37 ++++++++++++++++--
8 files changed, 104 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f0c116f..fd1062e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Allow repairing between specific replicas (CASSANDRA-6440)
* Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index ebcd3f4..36b7226 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -105,12 +105,12 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
* @param dataCenters the data centers that should be part of the repair; null for all DCs
* @param cfnames names of columnfamilies
*/
- public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+ public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
{
- this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
+ this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, hosts, cfnames);
}
- public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
+ public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
{
this.id = id;
this.isSequential = isSequential;
@@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.range = range;
- this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
+ this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
}
public UUID getId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b77f216..00e43ea 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
@@ -91,9 +92,9 @@ public class ActiveRepairService
*
* @return Future for asynchronous call or null if there is no need to repair
*/
- public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+ public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
{
- RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
+ RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, hosts, cfnames);
if (session.endpoints.isEmpty())
return null;
RepairFuture futureTask = new RepairFuture(session);
@@ -127,7 +128,7 @@ public class ActiveRepairService
// add it to the sessions (avoid NPE in tests)
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
- RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
+ RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, null, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
@@ -143,7 +144,7 @@ public class ActiveRepairService
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
throw new IllegalArgumentException("The local data center must be part of the repair");
@@ -182,6 +183,37 @@ public class ActiveRepairService
}
return Sets.intersection(neighbors, dcEndpoints);
}
+ else if (hosts != null)
+ {
+ Set<InetAddress> specifiedHost = new HashSet<>();
+ for (final String host : hosts)
+ {
+ try
+ {
+ final InetAddress endpoint = InetAddress.getByName(host.trim());
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()) || neighbors.contains(endpoint))
+ specifiedHost.add(endpoint);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalArgumentException("Unknown host specified " + host, e);
+ }
+ }
+
+ if (!specifiedHost.contains(FBUtilities.getBroadcastAddress()))
+ throw new IllegalArgumentException("The current host must be part of the repair");
+
+ if (specifiedHost.size() <= 1)
+ {
+ String msg = "Repair requires at least two endpoints that are neighbours before it can continue, the endpoint used for this repair is %s, " +
+ "other available neighbours are %s but these neighbours were not part of the supplied list of hosts to use during the repair (%s).";
+ throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors, hosts));
+ }
+
+ specifiedHost.remove(FBUtilities.getBroadcastAddress());
+ return specifiedHost;
+
+ }
return neighbors;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c323a19..4be95b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2418,13 +2418,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
{
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+ return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final Collection<Range<Token>> ranges, final String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
@@ -2432,7 +2432,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
final int cmd = nextRepairCommand.incrementAndGet();
if (ranges.size() > 0)
{
- new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, columnFamilies)).start();
}
return cmd;
}
@@ -2456,14 +2456,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
}
public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
@@ -2517,10 +2517,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
}
- return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+ return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
{
return new FutureTask<Object>(new WrappedRunnable()
{
@@ -2544,7 +2544,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
RepairFuture future;
try
{
- future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
+ future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, hosts, columnFamilies);
}
catch (IllegalArgumentException e)
{
@@ -2594,7 +2594,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}, null);
}
- public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
+ public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
@@ -2608,7 +2608,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return null;
}
- return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
+ return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, hosts, names.toArray(new String[names.size()]));
}
public void forceTerminateAllRepairSessions() {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f949dcc..ed260b8 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -257,12 +257,12 @@ public interface StorageServiceMBean extends NotificationEmitter
*
* @return Repair command number, or 0 if nothing to repair
*/
- public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
/**
* Same as forceRepairAsync, but handles a specified range
*/
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies);
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index fb29342..bc81615 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -67,6 +67,7 @@ public class NodeCmd
private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
+ private static final Pair<String, String> HOST_REPAIR_OPT = Pair.create("hosts", "in-host");
private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
@@ -97,6 +98,7 @@ public class NodeCmd
options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
+ options.addOption(HOST_REPAIR_OPT, true, "only repair against specified nodes (comma separated)");
options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables");
@@ -1626,16 +1628,21 @@ public class NodeCmd
boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
+ boolean specificHosts = cmd.hasOption(HOST_REPAIR_OPT.left);
boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
Collection<String> dataCenters = null;
+ Collection<String> hosts = null;
+
if (specificDC)
dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
else if (localDC)
dataCenters = Arrays.asList(probe.getDataCenter());
+ else if(specificHosts)
+ hosts = Arrays.asList(cmd.getOptionValue(HOST_REPAIR_OPT.left).split(","));
if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
- probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+ probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, hosts, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
else
- probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies);
+ probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, hosts, primaryRange, columnFamilies);
break;
case FLUSH :
try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 28cafb7..594b41b 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -215,14 +215,14 @@ public class NodeProbe
ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
}
- public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
+ public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
+ if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange))
failed = true;
}
catch (Exception e)
@@ -240,14 +240,14 @@ public class NodeProbe
}
}
- public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
+ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, startToken, endToken))
+ if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, hosts, startToken, endToken))
failed = true;
}
catch (Exception e)
@@ -1045,16 +1045,16 @@ class RepairRunner implements NotificationListener
this.columnFamilies = columnFamilies;
}
- public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
+ public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
{
- cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
+ cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, columnFamilies);
waitForRepair();
return success;
}
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
+ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception
{
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
+ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, columnFamilies);
waitForRepair();
return success;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 1123fc0..eeb297a 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null, null));
}
assertEquals(expected, neighbors);
}
@@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
@@ -197,11 +197,40 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
}
assertEquals(expected, neighbors);
}
+ @Test
+ public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+ {
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+ // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned
+ addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
+ AbstractReplicationStrategy ars = Keyspace.open(keyspaceName).getReplicationStrategy();
+ List<InetAddress> expected = new ArrayList<>();
+ for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+ {
+ expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+ }
+
+ expected.remove(FBUtilities.getBroadcastAddress());
+ Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
+
+ assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts).iterator().next());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
+ {
+ addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
+ //Dont give local endpoint
+ Collection<String> hosts = Arrays.asList("127.0.0.3");
+ ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(), null, hosts);
+ }
+
Set<InetAddress> addTokens(int max) throws Throwable
{
TokenMetadata tmd = StorageService.instance.getTokenMetadata();