You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/29 20:26:51 UTC

[1/3] git commit: Allow specifying datacentersto participate in a repair patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218

Updated Branches:
  refs/heads/cassandra-2.0 d41a746a6 -> f065cbfe0
  refs/heads/trunk 2ddf2cdc6 -> 2042bc3ca


Allow specifying datacentersto participate in a repair
patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f065cbfe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f065cbfe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f065cbfe

Branch: refs/heads/cassandra-2.0
Commit: f065cbfe058a2b0bb58ed53602afe0f12942d525
Parents: d41a746
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 29 13:26:16 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 29 13:26:16 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/RepairSession.java  | 10 ++--
 .../cassandra/service/ActiveRepairService.java  | 27 +++++++---
 .../cassandra/service/StorageService.java       | 54 ++++++++++++++++++--
 .../cassandra/service/StorageServiceMBean.java  | 17 ++++++
 .../org/apache/cassandra/tools/NodeCmd.java     | 12 ++++-
 .../org/apache/cassandra/tools/NodeProbe.java   | 16 +++---
 .../apache/cassandra/tools/NodeToolHelp.yaml    |  4 ++
 .../service/AntiEntropyServiceTestAbstract.java |  8 +--
 9 files changed, 119 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 644c6b3..12469f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
  * Fix divide-by-zero in PCI (CASSANDRA-6403)
  * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
  * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 18688f9..ebcd3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -102,15 +102,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      * @param range range to repair
      * @param keyspace name of keyspace
      * @param isSequential true if performing repair on snapshots sequentially
-     * @param isLocal true if you want to perform repair only inside the data center
+     * @param dataCenters the data centers that should be part of the repair; null for all DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
     {
-        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames);
+        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
     }
 
-    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames)
+    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
     {
         this.id = id;
         this.isSequential = isSequential;
@@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         this.cfnames = cfnames;
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
         this.range = range;
-        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal);
+        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
     }
 
     public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2f16b31..b77f216 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
@@ -90,9 +91,9 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
     {
-        RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames);
+        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
         if (session.endpoints.isEmpty())
             return null;
         RepairFuture futureTask = new RepairFuture(session);
@@ -126,7 +127,7 @@ public class ActiveRepairService
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily});
+        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
         sessions.put(session.getId(), session);
         RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
@@ -138,12 +139,15 @@ public class ActiveRepairService
      *
      * @param keyspaceName keyspace to repair
      * @param toRepair token to repair
-     * @param isLocal need to use only nodes from local datacenter
+     * @param dataCenters the data centers to involve in the repair
      *
      * @return neighbors with whom we share the provided range
      */
-    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, boolean isLocal)
+    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters)
     {
+        if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+            throw new IllegalArgumentException("The local data center must be part of the repair");
+
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;
@@ -165,11 +169,18 @@ public class ActiveRepairService
         Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
 
-        if (isLocal)
+        if (dataCenters != null)
         {
             TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-            Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
-            return Sets.intersection(neighbors, localEndpoints);
+            Set<InetAddress> dcEndpoints = Sets.newHashSet();
+            Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints();
+            for (String dc : dataCenters)
+            {
+                Collection<InetAddress> c = dcEndpointsMap.get(dc);
+                if (c != null)
+                   dcEndpoints.addAll(c);
+            }
+            return Sets.intersection(neighbors, dcEndpoints);
         }
 
         return neighbors;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 418a496..5b3a3e1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2251,6 +2251,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         jmxNotification.setUserData(userObject);
         sendNotification(jmxNotification);
     }
+
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+    {
+        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+        return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+    }
+
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+    {
+        if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
+            return 0;
+
+        final int cmd = nextRepairCommand.incrementAndGet();
+        if (ranges.size() > 0)
+        {
+            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+        }
+        return cmd;
+    }
+
     public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
     {
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
@@ -2270,6 +2290,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return cmd;
     }
 
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+    {
+        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+                    parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+    }
+
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
@@ -2316,6 +2346,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
     {
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+    }
+
+    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+    {
         return new FutureTask<Object>(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
@@ -2324,13 +2364,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
 
+                if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+                {
+                    message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
+                    logger.error(message);
+                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                    return;
+                }
+
                 List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
                     RepairFuture future;
                     try
                     {
-                        future = forceKeyspaceRepair(range, keyspace, isSequential, isLocal, columnFamilies);
+                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
                     }
                     catch (IllegalArgumentException e)
                     {
@@ -2380,7 +2428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }, null);
     }
 
-    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
+    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
@@ -2394,7 +2442,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return null;
         }
 
-        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, isLocal, names.toArray(new String[names.size()]));
+        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2dd8b00..be1b0aa 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -256,6 +256,23 @@ public interface StorageServiceMBean extends NotificationEmitter
      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
      *
      * @return Repair command number, or 0 if nothing to repair
+     */
+    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+
+    /**
+     * Same as forceRepairAsync, but handles a specified range
+     */
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+
+
+    /**
+     * Invoke repair asynchronously.
+     * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+     * Notification format is:
+     *   type: "repair"
+     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+     *
+     * @return Repair command number, or 0 if nothing to repair
      * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
      */
     public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 022f9c0..5c071b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -67,6 +67,7 @@ public class NodeCmd
     private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
     private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
     private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
+    private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
     private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
     private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
     private static final Pair<String, String> UPGRADE_ALL_SSTABLE_OPT = Pair.create("a", "include-all-sstables");
@@ -92,6 +93,7 @@ public class NodeCmd
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
         options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
         options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
+        options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
         options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
         options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
         options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables");
@@ -1480,11 +1482,17 @@ public class NodeCmd
                 case REPAIR  :
                     boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
                     boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
+                    boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
                     boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
+                    Collection<String> dataCenters = null;
+                    if (specificDC)
+                        dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
+                    else if (localDC)
+                        dataCenters = Arrays.asList(probe.getDataCenter());
                     if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
-                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, localDC, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
                     else
-                        probe.forceRepairAsync(System.out, keyspace, sequential, localDC, primaryRange, columnFamilies);
+                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 0008325..d784f29 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -215,14 +215,14 @@ public class NodeProbe
         ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
     }
 
-    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException
+    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange))
+            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
                 failed = true;
         }
         catch (Exception e)
@@ -240,14 +240,14 @@ public class NodeProbe
         }
     }
 
-    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies) throws IOException
+    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, isLocal, startToken, endToken))
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, startToken, endToken))
                 failed = true;
         }
         catch (Exception e)
@@ -1009,16 +1009,16 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception
+    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
     {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
+        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
         waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
     {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies);
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
         waitForRepair();
         return success;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 632d7e1..54d0884 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -148,8 +148,12 @@ commands:
   - name: repair [keyspace] [cfnames]
     help: |
       Repair one or more column families
+         Use -dc to repair specific datacenters (csv list).
+         Use -et to specify a token at which repair range ends.
+         Use -local to only repair against nodes in the same datacenter.
          Use -pr to repair only the first range returned by the partitioner.
          Use -par to carry out a parallel repair.
+         Use -st to specify a token at which the repair range starts.
   - name: cleanup [keyspace] [cfnames]
     help: |
       Run cleanup on one or more column families

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 4023910..1123fc0 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
         }
         assertEquals(expected, neighbors);
     }
@@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
         }
         assertEquals(expected, neighbors);
     }
@@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
         }
         assertEquals(expected, neighbors);
     }
@@ -197,7 +197,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
         }
         assertEquals(expected, neighbors);
     }


[2/3] git commit: Allow specifying datacentersto participate in a repair patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218

Posted by jb...@apache.org.
Allow specifying datacentersto participate in a repair
patch by Jimmy Mårdell; reviewed by Lyuben Todorov for CASSANDRA-6218


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f065cbfe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f065cbfe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f065cbfe

Branch: refs/heads/trunk
Commit: f065cbfe058a2b0bb58ed53602afe0f12942d525
Parents: d41a746
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 29 13:26:16 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 29 13:26:16 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/RepairSession.java  | 10 ++--
 .../cassandra/service/ActiveRepairService.java  | 27 +++++++---
 .../cassandra/service/StorageService.java       | 54 ++++++++++++++++++--
 .../cassandra/service/StorageServiceMBean.java  | 17 ++++++
 .../org/apache/cassandra/tools/NodeCmd.java     | 12 ++++-
 .../org/apache/cassandra/tools/NodeProbe.java   | 16 +++---
 .../apache/cassandra/tools/NodeToolHelp.yaml    |  4 ++
 .../service/AntiEntropyServiceTestAbstract.java |  8 +--
 9 files changed, 119 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 644c6b3..12469f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
  * Fix divide-by-zero in PCI (CASSANDRA-6403)
  * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
  * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 18688f9..ebcd3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -102,15 +102,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      * @param range range to repair
      * @param keyspace name of keyspace
      * @param isSequential true if performing repair on snapshots sequentially
-     * @param isLocal true if you want to perform repair only inside the data center
+     * @param dataCenters the data centers that should be part of the repair; null for all DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
     {
-        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames);
+        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
     }
 
-    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames)
+    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
     {
         this.id = id;
         this.isSequential = isSequential;
@@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         this.cfnames = cfnames;
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
         this.range = range;
-        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal);
+        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
     }
 
     public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2f16b31..b77f216 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
@@ -90,9 +91,9 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
     {
-        RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames);
+        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
         if (session.endpoints.isEmpty())
             return null;
         RepairFuture futureTask = new RepairFuture(session);
@@ -126,7 +127,7 @@ public class ActiveRepairService
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily});
+        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
         sessions.put(session.getId(), session);
         RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
@@ -138,12 +139,15 @@ public class ActiveRepairService
      *
      * @param keyspaceName keyspace to repair
      * @param toRepair token to repair
-     * @param isLocal need to use only nodes from local datacenter
+     * @param dataCenters the data centers to involve in the repair
      *
      * @return neighbors with whom we share the provided range
      */
-    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, boolean isLocal)
+    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters)
     {
+        if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+            throw new IllegalArgumentException("The local data center must be part of the repair");
+
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;
@@ -165,11 +169,18 @@ public class ActiveRepairService
         Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet));
         neighbors.remove(FBUtilities.getBroadcastAddress());
 
-        if (isLocal)
+        if (dataCenters != null)
         {
             TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-            Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
-            return Sets.intersection(neighbors, localEndpoints);
+            Set<InetAddress> dcEndpoints = Sets.newHashSet();
+            Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints();
+            for (String dc : dataCenters)
+            {
+                Collection<InetAddress> c = dcEndpointsMap.get(dc);
+                if (c != null)
+                   dcEndpoints.addAll(c);
+            }
+            return Sets.intersection(neighbors, dcEndpoints);
         }
 
         return neighbors;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 418a496..5b3a3e1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2251,6 +2251,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         jmxNotification.setUserData(userObject);
         sendNotification(jmxNotification);
     }
+
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+    {
+        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
+        return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+    }
+
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+    {
+        if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
+            return 0;
+
+        final int cmd = nextRepairCommand.incrementAndGet();
+        if (ranges.size() > 0)
+        {
+            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+        }
+        return cmd;
+    }
+
     public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
     {
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
@@ -2270,6 +2290,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return cmd;
     }
 
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+    {
+        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
+                    parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+    }
+
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
@@ -2316,6 +2346,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
     {
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+    }
+
+    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+    {
         return new FutureTask<Object>(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
@@ -2324,13 +2364,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
 
+                if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+                {
+                    message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
+                    logger.error(message);
+                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                    return;
+                }
+
                 List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
                     RepairFuture future;
                     try
                     {
-                        future = forceKeyspaceRepair(range, keyspace, isSequential, isLocal, columnFamilies);
+                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
                     }
                     catch (IllegalArgumentException e)
                     {
@@ -2380,7 +2428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }, null);
     }
 
-    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
+    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
@@ -2394,7 +2442,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return null;
         }
 
-        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, isLocal, names.toArray(new String[names.size()]));
+        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2dd8b00..be1b0aa 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -256,6 +256,23 @@ public interface StorageServiceMBean extends NotificationEmitter
      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
      *
      * @return Repair command number, or 0 if nothing to repair
+     */
+    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+
+    /**
+     * Same as forceRepairAsync, but handles a specified range
+     */
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+
+
+    /**
+     * Invoke repair asynchronously.
+     * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+     * Notification format is:
+     *   type: "repair"
+     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+     *
+     * @return Repair command number, or 0 if nothing to repair
      * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
      */
     public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 022f9c0..5c071b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -67,6 +67,7 @@ public class NodeCmd
     private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
     private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
     private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
+    private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
     private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
     private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
     private static final Pair<String, String> UPGRADE_ALL_SSTABLE_OPT = Pair.create("a", "include-all-sstables");
@@ -92,6 +93,7 @@ public class NodeCmd
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
         options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
         options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
+        options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
         options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
         options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
         options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables");
@@ -1480,11 +1482,17 @@ public class NodeCmd
                 case REPAIR  :
                     boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
                     boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
+                    boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
                     boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
+                    Collection<String> dataCenters = null;
+                    if (specificDC)
+                        dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
+                    else if (localDC)
+                        dataCenters = Arrays.asList(probe.getDataCenter());
                     if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
-                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, localDC, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
                     else
-                        probe.forceRepairAsync(System.out, keyspace, sequential, localDC, primaryRange, columnFamilies);
+                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 0008325..d784f29 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -215,14 +215,14 @@ public class NodeProbe
         ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
     }
 
-    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException
+    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange))
+            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
                 failed = true;
         }
         catch (Exception e)
@@ -240,14 +240,14 @@ public class NodeProbe
         }
     }
 
-    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies) throws IOException
+    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, isLocal, startToken, endToken))
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, startToken, endToken))
                 failed = true;
         }
         catch (Exception e)
@@ -1009,16 +1009,16 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception
+    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
     {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
+        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
         waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
     {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies);
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
         waitForRepair();
         return success;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 632d7e1..54d0884 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -148,8 +148,12 @@ commands:
   - name: repair [keyspace] [cfnames]
     help: |
       Repair one or more column families
+         Use -dc to repair specific datacenters (csv list).
+         Use -et to specify a token at which repair range ends.
+         Use -local to only repair against nodes in the same datacenter.
          Use -pr to repair only the first range returned by the partitioner.
          Use -par to carry out a parallel repair.
+         Use -st to specify a token at which the repair range starts.
   - name: cleanup [keyspace] [cfnames]
     help: |
       Run cleanup on one or more column families

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 4023910..1123fc0 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
         }
         assertEquals(expected, neighbors);
     }
@@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
         }
         assertEquals(expected, neighbors);
     }
@@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
         }
         assertEquals(expected, neighbors);
     }
@@ -197,7 +197,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
         }
         assertEquals(expected, neighbors);
     }


[3/3] git commit: merge from 2.0

Posted by jb...@apache.org.
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2042bc3c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2042bc3c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2042bc3c

Branch: refs/heads/trunk
Commit: 2042bc3ca9332fabe1069ce741969eec5bfb3d6d
Parents: 2ddf2cd f065cbf
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 29 13:26:43 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 29 13:26:43 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/repair/RepairSession.java  | 10 ++--
 .../cassandra/service/ActiveRepairService.java  | 27 +++++++---
 .../cassandra/service/StorageService.java       | 54 ++++++++++++++++++--
 .../cassandra/service/StorageServiceMBean.java  | 17 ++++++
 .../org/apache/cassandra/tools/NodeCmd.java     | 12 ++++-
 .../org/apache/cassandra/tools/NodeProbe.java   | 16 +++---
 .../apache/cassandra/tools/NodeToolHelp.yaml    |  4 ++
 .../service/AntiEntropyServiceTestAbstract.java |  8 +--
 9 files changed, 120 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8d11a0a,12469f2..81ba65a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,8 +1,26 @@@
 +2.1
 + * allocate fixed index summary memory pool and resample cold index summaries 
 +   to use less memory (CASSANDRA-5519)
 + * Removed multithreaded compaction (CASSANDRA-6142)
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
 + * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 +
 +
  2.0.4
+  * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
+  * Fix divide-by-zero in PCI (CASSANDRA-6403)
   * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
   * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)
 + * Add snapshot space used to cfstats (CASSANDRA-6231)
   * Expose a total memtable size metric for a CF (CASSANDRA-6391)
  Merged from 1.2:
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2042bc3c/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------