You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/29 20:26:52 UTC
[2/3] git commit: Allow specifying datacentersto participate in a repair patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218
Allow specifying datacentersto participate in a repair
patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f065cbfe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f065cbfe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f065cbfe
Branch: refs/heads/trunk
Commit: f065cbfe058a2b0bb58ed53602afe0f12942d525
Parents: d41a746
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 29 13:26:16 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 29 13:26:16 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairSession.java | 10 ++--
.../cassandra/service/ActiveRepairService.java | 27 +++++++---
.../cassandra/service/StorageService.java | 54 ++++++++++++++++++--
.../cassandra/service/StorageServiceMBean.java | 17 ++++++
.../org/apache/cassandra/tools/NodeCmd.java | 12 ++++-
.../org/apache/cassandra/tools/NodeProbe.java | 16 +++---
.../apache/cassandra/tools/NodeToolHelp.yaml | 4 ++
.../service/AntiEntropyServiceTestAbstract.java | 8 +--
9 files changed, 119 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 644c6b3..12469f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.4
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
* Fix divide-by-zero in PCI (CASSANDRA-6403)
* Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
* Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 18688f9..ebcd3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -102,15 +102,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
* @param range range to repair
* @param keyspace name of keyspace
* @param isSequential true if performing repair on snapshots sequentially
- * @param isLocal true if you want to perform repair only inside the data center
+ * @param dataCenters the data centers that should be part of the repair; null for all DCs
* @param cfnames names of columnfamilies
*/
- public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+ public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
{
- this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames);
+ this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
}
- public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames)
+ public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
{
this.id = id;
this.isSequential = isSequential;
@@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
this.cfnames = cfnames;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.range = range;
- this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal);
+ this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
}
public UUID getId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2f16b31..b77f216 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
@@ -90,9 +91,9 @@ public class ActiveRepairService
*
* @return Future for asynchronous call or null if there is no need to repair
*/
- public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+ public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
{
- RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames);
+ RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
if (session.endpoints.isEmpty())
return null;
RepairFuture futureTask = new RepairFuture(session);
@@ -126,7 +127,7 @@ public class ActiveRepairService
// add it to the sessions (avoid NPE in tests)
RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
{
- RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily});
+ RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
sessions.put(session.getId(), session);
RepairFuture futureTask = new RepairFuture(session);
executor.execute(futureTask);
@@ -138,12 +139,15 @@ public class ActiveRepairService
*
* @param keyspaceName keyspace to repair
* @param toRepair token to repair
- * @param isLocal need to use only nodes from local datacenter
+ * @param dataCenters the data centers to involve in the repair
*
* @return neighbors with whom we share the provided range
*/
- public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, boolean isLocal)
+ public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters)
{
+ if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ throw new IllegalArgumentException("The local data center must be part of the repair");
+
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
@@ -165,11 +169,18 @@ public class ActiveRepairService
Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet));
neighbors.remove(FBUtilities.getBroadcastAddress());
- if (isLocal)
+ if (dataCenters != null)
{
TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
- Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
- return Sets.intersection(neighbors, localEndpoints);
+ Set<InetAddress> dcEndpoints = Sets.newHashSet();
+ Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints();
+ for (String dc : dataCenters)
+ {
+ Collection<InetAddress> c = dcEndpointsMap.get(dc);
+ if (c != null)
+ dcEndpoints.addAll(c);
+ }
+ return Sets.intersection(neighbors, dcEndpoints);
}
return neighbors;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 418a496..5b3a3e1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2251,6 +2251,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
jmxNotification.setUserData(userObject);
sendNotification(jmxNotification);
}
+
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+ {
+ final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+ }
+
+ public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ {
+ if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
+ return 0;
+
+ final int cmd = nextRepairCommand.incrementAndGet();
+ if (ranges.size() > 0)
+ {
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+ }
+ return cmd;
+ }
+
public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
{
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
@@ -2270,6 +2290,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+ {
+ Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+ Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+ logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+ parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ }
+
public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
{
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
@@ -2316,6 +2346,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
{
+ Set<String> dataCenters = null;
+ if (isLocal)
+ {
+ dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+ }
+ return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+ }
+
+ private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+ {
return new FutureTask<Object>(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
@@ -2324,13 +2364,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
+ if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ {
+ message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
+ logger.error(message);
+ sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
+
List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
for (Range<Token> range : ranges)
{
RepairFuture future;
try
{
- future = forceKeyspaceRepair(range, keyspace, isSequential, isLocal, columnFamilies);
+ future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
}
catch (IllegalArgumentException e)
{
@@ -2380,7 +2428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}, null);
}
- public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
@@ -2394,7 +2442,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return null;
}
- return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, isLocal, names.toArray(new String[names.size()]));
+ return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
}
public void forceTerminateAllRepairSessions() {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2dd8b00..be1b0aa 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -256,6 +256,23 @@ public interface StorageServiceMBean extends NotificationEmitter
* userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
*
* @return Repair command number, or 0 if nothing to repair
+ */
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+
+ /**
+ * Same as forceRepairAsync, but handles a specified range
+ */
+ public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+
+
+ /**
+ * Invoke repair asynchronously.
+ * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+ * Notification format is:
+ * type: "repair"
+ * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
* @see #forceKeyspaceRepair(String, boolean, boolean, String...)
*/
public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 022f9c0..5c071b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -67,6 +67,7 @@ public class NodeCmd
private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
+ private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
private static final Pair<String, String> UPGRADE_ALL_SSTABLE_OPT = Pair.create("a", "include-all-sstables");
@@ -92,6 +93,7 @@ public class NodeCmd
options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
+ options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables");
@@ -1480,11 +1482,17 @@ public class NodeCmd
case REPAIR :
boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
+ boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
+ Collection<String> dataCenters = null;
+ if (specificDC)
+ dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
+ else if (localDC)
+ dataCenters = Arrays.asList(probe.getDataCenter());
if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
- probe.forceRepairRangeAsync(System.out, keyspace, sequential, localDC, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+ probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
else
- probe.forceRepairAsync(System.out, keyspace, sequential, localDC, primaryRange, columnFamilies);
+ probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies);
break;
case FLUSH :
try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 0008325..d784f29 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -215,14 +215,14 @@ public class NodeProbe
ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
}
- public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException
+ public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange))
+ if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
failed = true;
}
catch (Exception e)
@@ -240,14 +240,14 @@ public class NodeProbe
}
}
- public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies) throws IOException
+ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
{
RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
try
{
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairRangeAndWait(ssProxy, isSequential, isLocal, startToken, endToken))
+ if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, startToken, endToken))
failed = true;
}
catch (Exception e)
@@ -1009,16 +1009,16 @@ class RepairRunner implements NotificationListener
this.columnFamilies = columnFamilies;
}
- public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception
+ public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
{
- cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
+ cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
waitForRepair();
return success;
}
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception
+ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
{
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies);
+ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
waitForRepair();
return success;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 632d7e1..54d0884 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -148,8 +148,12 @@ commands:
- name: repair [keyspace] [cfnames]
help: |
Repair one or more column families
+ Use -dc to repair specific datacenters (csv list).
+ Use -et to specify a token at which repair range ends.
+ Use -local to only repair against nodes in the same datacenter.
Use -pr to repair only the first range returned by the partitioner.
Use -par to carry out a parallel repair.
+ Use -st to specify a token at which the repair range starts.
- name: cleanup [keyspace] [cfnames]
help: |
Run cleanup on one or more column families
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 4023910..1123fc0 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
}
assertEquals(expected, neighbors);
}
@@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
}
assertEquals(expected, neighbors);
}
@@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
}
assertEquals(expected, neighbors);
}
@@ -197,7 +197,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
Set<InetAddress> neighbors = new HashSet<InetAddress>();
for (Range<Token> range : ranges)
{
- neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+ neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
}
assertEquals(expected, neighbors);
}