You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/19 18:33:57 UTC
[6/9] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/tools/NodeCmd.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/40f8ebae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/40f8ebae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/40f8ebae
Branch: refs/heads/trunk
Commit: 40f8ebae6c7cf3024e9212582b5b7caa7825c37f
Parents: 4f381a2 434b5d6
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Jun 19 11:33:18 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jun 19 11:33:18 2014 -0500
----------------------------------------------------------------------
.../cassandra/service/ActiveRepairService.java | 3 -
.../cassandra/service/StorageService.java | 64 +++++++++++++++-----
.../org/apache/cassandra/tools/NodeTool.java | 3 +
3 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/40f8ebae/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/40f8ebae/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 7dc2a6c,13dd3b7..06d28c3
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2551,14 -2486,18 +2551,19 @@@ public class StorageService extends Not
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
{
+ // when repairing only primary range, dataCenter nor hosts can be set
+ if (primaryRange && (dataCenters != null || hosts != null))
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+
+ return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final Collection<Range<Token>> ranges, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
@@@ -2571,13 -2510,18 +2576,18 @@@
return cmd;
}
- public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
{
+ // when repairing only primary range, you cannot repair only on local DC
+ if (primaryRange && isLocal)
+ {
+ throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+ }
- final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
+ Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+ return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
}
- public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
return 0;
@@@ -2592,51 -2536,106 +2602,82 @@@
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
{
- Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
- Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+ Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, columnFamilies);
- }
-
- public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
- {
- Set<String> dataCenters = null;
- if (isLocal)
- {
- dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
- }
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
- }
-
- /**
- * Trigger proactive repair for a keyspace and column families.
- */
- public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
- {
- forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, columnFamilies);
- }
-
- public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
- {
- // primary range repair can only be performed for whole cluster.
- // NOTE: we should omit the param but keep API as is for now.
- if (isLocal)
- {
- throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
- }
-
- forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
++ repairingRange, keyspaceName, columnFamilies);
++ return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
}
- public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
{
+ Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
+
+ logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+ repairingRange, keyspaceName, columnFamilies);
- forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, isLocal, columnFamilies);
- }
-
- public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
- {
- if (Keyspace.SYSTEM_KS.equalsIgnoreCase(keyspaceName))
- return;
- createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
++ return forceRepairAsync(keyspaceName, isSequential, isLocal, repairingRange, fullRepair, columnFamilies);
+ }
+
+ /**
+ * Create collection of ranges that match ring layout from given tokens.
+ *
+ * @param beginToken beginning token of the range
+ * @param endToken end token of the range
+ * @return collection of ranges that match ring layout in TokenMetadata
+ */
+ @SuppressWarnings("unchecked")
+ private Collection<Range<Token>> createRepairRangeFrom(String beginToken, String endToken)
+ {
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
- logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
- parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+ Deque<Range<Token>> repairingRange = new ArrayDeque<>();
+ // Break up given range to match ring layout in TokenMetadata
+ Token previous = tokenMetadata.getPredecessor(TokenMetadata.firstToken(tokenMetadata.sortedTokens(), parsedEndToken));
+ while (parsedBeginToken.compareTo(previous) < 0)
+ {
+ repairingRange.addFirst(new Range<>(previous, parsedEndToken));
+
+ parsedEndToken = previous;
+ previous = tokenMetadata.getPredecessor(previous);
+ }
+ repairingRange.addFirst(new Range<>(parsedBeginToken, parsedEndToken));
+
+ return repairingRange;
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(int cmd,
+ String keyspace,
+ Collection<Range<Token>> ranges,
+ boolean isSequential,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
{
Set<String> dataCenters = null;
if (isLocal)
{
dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
}
- return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, columnFamilies);
+ return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies);
}
- private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+ private FutureTask<Object> createRepairTask(final int cmd,
+ final String keyspace,
+ final Collection<Range<Token>> ranges,
+ final boolean isSequential,
+ final Collection<String> dataCenters,
+ final Collection<String> hosts,
+ final boolean fullRepair,
+ final String... columnFamilies)
{
+ if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ {
+ throw new IllegalArgumentException("the local data center must be part of the repair");
+ }
+
return new FutureTask<>(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
@@@ -2645,30 -2644,13 +2686,23 @@@
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
- List<RepairFuture> futures = new ArrayList<>(ranges.size());
+ if (isSequential && !fullRepair)
+ {
+ message = "It is not possible to mix sequential repair and incremental repairs.";
+ logger.error(message);
+ sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+ return;
+ }
- if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
- {
- message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
- logger.error(message);
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- return;
- }
+
+ Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
for (Range<Token> range : ranges)
{
- RepairFuture future;
try
{
- future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, hosts, columnFamilies);
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
}
catch (IllegalArgumentException e)
{