You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/06/19 18:33:57 UTC

[6/9] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/service/StorageService.java
	src/java/org/apache/cassandra/tools/NodeCmd.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/40f8ebae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/40f8ebae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/40f8ebae

Branch: refs/heads/trunk
Commit: 40f8ebae6c7cf3024e9212582b5b7caa7825c37f
Parents: 4f381a2 434b5d6
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Jun 19 11:33:18 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jun 19 11:33:18 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/ActiveRepairService.java  |  3 -
 .../cassandra/service/StorageService.java       | 64 +++++++++++++++-----
 .../org/apache/cassandra/tools/NodeTool.java    |  3 +
 3 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/40f8ebae/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/40f8ebae/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 7dc2a6c,13dd3b7..06d28c3
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2551,14 -2486,18 +2551,19 @@@ public class StorageService extends Not
          sendNotification(jmxNotification);
      }
  
 -    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
      {
+         // when repairing only primary range, dataCenter nor hosts can be set
+         if (primaryRange && (dataCenters != null || hosts != null))
+         {
+             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+         }
 -        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 -        return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
 +        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 +
 +        return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
      }
  
 -    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts,  final Collection<Range<Token>> ranges, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
      {
          if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
              return 0;
@@@ -2571,13 -2510,18 +2576,18 @@@
          return cmd;
      }
  
 -    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
      {
+         // when repairing only primary range, you cannot repair only on local DC
+         if (primaryRange && isLocal)
+         {
+             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
+         }
 -        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 -        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
 +        Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 +        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
      }
  
 -    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
      {
          if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
              return 0;
@@@ -2592,51 -2536,106 +2602,82 @@@
          return cmd;
      }
  
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
 +    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
      {
-         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
-         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+         Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
  
-         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
-                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-         return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+         logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
 -                    repairingRange, keyspaceName, columnFamilies);
 -        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, columnFamilies);
 -    }
 -
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
 -    {
 -        Set<String> dataCenters = null;
 -        if (isLocal)
 -        {
 -            dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
 -        }
 -        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
 -    }
 -
 -    /**
 -     * Trigger proactive repair for a keyspace and column families.
 -     */
 -    public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 -    {
 -        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, columnFamilies);
 -    }
 -
 -    public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 -    {
 -        // primary range repair can only be performed for whole cluster.
 -        // NOTE: we should omit the param but keep API as is for now.
 -        if (isLocal)
 -        {
 -            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
 -        }
 -
 -        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
++                           repairingRange, keyspaceName, columnFamilies);
++        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
      }
  
 -    public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 +    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
      {
+         Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
+ 
+         logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
+                            repairingRange, keyspaceName, columnFamilies);
 -        forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, isLocal, columnFamilies);
 -    }
 -
 -    public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 -    {
 -        if (Keyspace.SYSTEM_KS.equalsIgnoreCase(keyspaceName))
 -            return;
 -        createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
++        return forceRepairAsync(keyspaceName, isSequential, isLocal, repairingRange, fullRepair, columnFamilies);
+     }
+ 
+     /**
+      * Create collection of ranges that match ring layout from given tokens.
+      *
+      * @param beginToken beginning token of the range
+      * @param endToken end token of the range
+      * @return collection of ranges that match ring layout in TokenMetadata
+      */
+     @SuppressWarnings("unchecked")
+     private Collection<Range<Token>> createRepairRangeFrom(String beginToken, String endToken)
+     {
          Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
          Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
  
-         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
-                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-         return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
+         Deque<Range<Token>> repairingRange = new ArrayDeque<>();
+         // Break up given range to match ring layout in TokenMetadata
+         Token previous = tokenMetadata.getPredecessor(TokenMetadata.firstToken(tokenMetadata.sortedTokens(), parsedEndToken));
+         while (parsedBeginToken.compareTo(previous) < 0)
+         {
+             repairingRange.addFirst(new Range<>(previous, parsedEndToken));
+ 
+             parsedEndToken = previous;
+             previous = tokenMetadata.getPredecessor(previous);
+         }
+         repairingRange.addFirst(new Range<>(parsedBeginToken, parsedEndToken));
+ 
+         return repairingRange;
      }
  
 -    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
 +    private FutureTask<Object> createRepairTask(int cmd,
 +                                                String keyspace,
 +                                                Collection<Range<Token>> ranges,
 +                                                boolean isSequential,
 +                                                boolean isLocal,
 +                                                boolean fullRepair,
 +                                                String... columnFamilies)
      {
          Set<String> dataCenters = null;
          if (isLocal)
          {
              dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
          }
 -        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, columnFamilies);
 +        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies);
      }
  
 -    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
 +    private FutureTask<Object> createRepairTask(final int cmd,
 +                                                final String keyspace,
 +                                                final Collection<Range<Token>> ranges,
 +                                                final boolean isSequential,
 +                                                final Collection<String> dataCenters,
 +                                                final Collection<String> hosts,
 +                                                final boolean fullRepair,
 +                                                final String... columnFamilies)
      {
+         if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+         {
+             throw new IllegalArgumentException("the local data center must be part of the repair");
+         }
+ 
          return new FutureTask<>(new WrappedRunnable()
          {
              protected void runMayThrow() throws Exception
@@@ -2645,30 -2644,13 +2686,23 @@@
                  logger.info(message);
                  sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
  
 -                List<RepairFuture> futures = new ArrayList<>(ranges.size());
 +                if (isSequential && !fullRepair)
 +                {
 +                    message = "It is not possible to mix sequential repair and incremental repairs.";
 +                    logger.error(message);
 +                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
 +                    return;
 +                }
-                 if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-                 {
-                     message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
-                     logger.error(message);
-                     sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                     return;
-                 }
 +
 +                Set<InetAddress> allNeighbors = new HashSet<>();
 +                Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
                  for (Range<Token> range : ranges)
                  {
 -                    RepairFuture future;
                      try
                      {
 -                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, hosts, columnFamilies);
 +                        Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
 +                        rangeToNeighbors.put(range, neighbors);
 +                        allNeighbors.addAll(neighbors);
                      }
                      catch (IllegalArgumentException e)
                      {