You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/19 18:33:53 UTC
[2/9] git commit: Improve validation of sub range repair
Improve validation of sub range repair
also prevent "-pr" repair not to work with "-dc/-hosts/-local".
patch by yukim; reviewed by krummas for CASSANDRA-7317
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/434b5d68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/434b5d68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/434b5d68
Branch: refs/heads/cassandra-2.1
Commit: 434b5d683ec7520acf1a5a2d421ee5aba2ede0e8
Parents: 303ff22
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Jun 19 10:47:14 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jun 19 10:47:14 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/ActiveRepairService.java | 3 -
.../cassandra/service/StorageService.java | 98 +++++++++++++-------
.../org/apache/cassandra/tools/NodeCmd.java | 3 +
3 files changed, 70 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 00e43ea..aac9f9a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -146,9 +146,6 @@ public class ActiveRepairService
*/
public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
{
- if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
- throw new IllegalArgumentException("The local data center must be part of the repair");
-
StorageService ss = StorageService.instance;
Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
Range<Token> rangeSuperSet = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 05cc8d7..13dd3b7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2488,6 +2488,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
{
+ // when repairing only primary range, dataCenter nor hosts can be set
+ if (primaryRange && (dataCenters != null || hosts != null))
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
}
@@ -2507,6 +2512,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
{
+ // when repairing only primary range, you cannot repair only on local DC
+ if (primaryRange && isLocal)
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
}
@@ -2528,30 +2538,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
{
- Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
- Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+ Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+ repairingRange, keyspaceName, columnFamilies);
+ return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, columnFamilies);
}
public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
{
- Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
- Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
-
- logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+ Set<String> dataCenters = null;
+ if (isLocal)
+ {
+ dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+ }
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
}
-
/**
* Trigger proactive repair for a keyspace and column families.
- * @param keyspaceName
- * @param columnFamilies
- * @throws IOException
*/
public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
@@ -2560,17 +2565,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
- forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+ // primary range repair can only be performed for whole cluster.
+ // NOTE: we should omit the param but keep API as is for now.
+ if (isLocal)
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
+
+ forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
}
public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
{
- Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
- Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+ Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
+ logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+ repairingRange, keyspaceName, columnFamilies);
+ forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, isLocal, columnFamilies);
}
public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
@@ -2580,6 +2591,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
}
+ /**
+ * Create collection of ranges that match ring layout from given tokens.
+ *
+ * @param beginToken beginning token of the range
+ * @param endToken end token of the range
+ * @return collection of ranges that match ring layout in TokenMetadata
+ */
+ @SuppressWarnings("unchecked")
+ private Collection<Range<Token>> createRepairRangeFrom(String beginToken, String endToken)
+ {
+ Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+ Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+ Deque<Range<Token>> repairingRange = new ArrayDeque<>();
+ // Break up given range to match ring layout in TokenMetadata
+ Token previous = tokenMetadata.getPredecessor(TokenMetadata.firstToken(tokenMetadata.sortedTokens(), parsedEndToken));
+ while (parsedBeginToken.compareTo(previous) < 0)
+ {
+ repairingRange.addFirst(new Range<>(previous, parsedEndToken));
+
+ parsedEndToken = previous;
+ previous = tokenMetadata.getPredecessor(previous);
+ }
+ repairingRange.addFirst(new Range<>(parsedBeginToken, parsedEndToken));
+
+ return repairingRange;
+ }
+
private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
{
Set<String> dataCenters = null;
@@ -2592,7 +2631,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
{
- return new FutureTask<Object>(new WrappedRunnable()
+ if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ {
+ throw new IllegalArgumentException("the local data center must be part of the repair");
+ }
+
+ return new FutureTask<>(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
@@ -2600,15 +2644,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
- if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
- {
- message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
- logger.error(message);
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- return;
- }
-
- List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
+ List<RepairFuture> futures = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
{
RepairFuture future;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 213e4b4..afa42dd 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1659,6 +1659,9 @@ public class NodeCmd
Collection<String> dataCenters = null;
Collection<String> hosts = null;
+ if (primaryRange && (localDC || specificDC || specificHosts))
+ throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
+
if (specificDC)
dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
else if (localDC)