You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/19 18:33:53 UTC

[2/9] git commit: Improve validation of sub range repair

Improve validation of sub range repair

also prevent "-pr" repair not to work with "-dc/-hosts/-local".
patch by yukim; reviewed by krummas for CASSANDRA-7317


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/434b5d68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/434b5d68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/434b5d68

Branch: refs/heads/cassandra-2.1
Commit: 434b5d683ec7520acf1a5a2d421ee5aba2ede0e8
Parents: 303ff22
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Jun 19 10:47:14 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jun 19 10:47:14 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/ActiveRepairService.java  |  3 -
 .../cassandra/service/StorageService.java       | 98 +++++++++++++-------
 .../org/apache/cassandra/tools/NodeCmd.java     |  3 +
 3 files changed, 70 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 00e43ea..aac9f9a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -146,9 +146,6 @@ public class ActiveRepairService
      */
     public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
     {
-        if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-            throw new IllegalArgumentException("The local data center must be part of the repair");
-
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 05cc8d7..13dd3b7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2488,6 +2488,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
     {
+        // when repairing only primary range, dataCenter nor hosts can be set
+        if (primaryRange && (dataCenters != null || hosts != null))
+        {
+            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+        }
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
         return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
     }
@@ -2507,6 +2512,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
     {
+        // when repairing only primary range, you cannot repair only on local DC
+        if (primaryRange && isLocal)
+        {
+            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+        }
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
         return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
     }
@@ -2528,30 +2538,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
     {
-        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+        Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
 
-        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+        logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+                    repairingRange, keyspaceName, columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
     {
-        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
-
-        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
     }
 
-
     /**
      * Trigger proactive repair for a keyspace and column families.
-     * @param keyspaceName
-     * @param columnFamilies
-     * @throws IOException
      */
     public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
     {
@@ -2560,17 +2565,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
     {
-        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+        // primary range repair can only be performed for whole cluster.
+        // NOTE: we should omit the param but keep API as is for now.
+        if (isLocal)
+        {
+            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+        }
+
+        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
     }
 
     public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
     {
-        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+        Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
 
-        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
+        logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+                           repairingRange, keyspaceName, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, isLocal, columnFamilies);
     }
 
     public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
@@ -2580,6 +2591,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
     }
 
+    /**
+     * Create collection of ranges that match ring layout from given tokens.
+     *
+     * @param beginToken beginning token of the range
+     * @param endToken end token of the range
+     * @return collection of ranges that match ring layout in TokenMetadata
+     */
+    @SuppressWarnings("unchecked")
+    private Collection<Range<Token>> createRepairRangeFrom(String beginToken, String endToken)
+    {
+        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+        Deque<Range<Token>> repairingRange = new ArrayDeque<>();
+        // Break up given range to match ring layout in TokenMetadata
+        Token previous = tokenMetadata.getPredecessor(TokenMetadata.firstToken(tokenMetadata.sortedTokens(), parsedEndToken));
+        while (parsedBeginToken.compareTo(previous) < 0)
+        {
+            repairingRange.addFirst(new Range<>(previous, parsedEndToken));
+
+            parsedEndToken = previous;
+            previous = tokenMetadata.getPredecessor(previous);
+        }
+        repairingRange.addFirst(new Range<>(parsedBeginToken, parsedEndToken));
+
+        return repairingRange;
+    }
+
     private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
     {
         Set<String> dataCenters = null;
@@ -2592,7 +2631,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
     {
-        return new FutureTask<Object>(new WrappedRunnable()
+        if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+        {
+            throw new IllegalArgumentException("the local data center must be part of the repair");
+        }
+
+        return new FutureTask<>(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
             {
@@ -2600,15 +2644,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
 
-                if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-                {
-                    message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
-                    logger.error(message);
-                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                    return;
-                }
-
-                List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
+                List<RepairFuture> futures = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
                     RepairFuture future;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 213e4b4..afa42dd 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1659,6 +1659,9 @@ public class NodeCmd
                     Collection<String> dataCenters = null;
                     Collection<String> hosts = null;
 
+                    if (primaryRange && (localDC || specificDC || specificHosts))
+                        throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
+
                     if (specificDC)
                         dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
                     else if (localDC)