You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/11/24 22:26:22 UTC

[5/8] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/repair/RepairJob.java
	src/java/org/apache/cassandra/repair/RepairSession.java
	src/java/org/apache/cassandra/service/ActiveRepairService.java
	src/java/org/apache/cassandra/service/StorageService.java
	src/java/org/apache/cassandra/service/StorageServiceMBean.java
	src/java/org/apache/cassandra/tools/NodeCmd.java
	src/java/org/apache/cassandra/tools/NodeProbe.java
	src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/326a9ff2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/326a9ff2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/326a9ff2

Branch: refs/heads/trunk
Commit: 326a9ff2f831eeafedbc37b7a4b8f8f4a709e399
Parents: eac7781 41469ec
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 24 15:21:34 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 24 15:21:34 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DatacenterAwareRequestCoordinator.java      |  73 +++++++++++
 .../cassandra/repair/IRequestCoordinator.java   |  28 ++++
 .../cassandra/repair/IRequestProcessor.java     |  23 ++++
 .../repair/ParallelRequestCoordinator.java      |  49 +++++++
 .../org/apache/cassandra/repair/RepairJob.java  |  32 ++++-
 .../cassandra/repair/RepairParallelism.java     |  22 ++++
 .../apache/cassandra/repair/RepairSession.java  |  14 +-
 .../cassandra/repair/RequestCoordinator.java    | 128 -------------------
 .../repair/SequentialRequestCoordinator.java    |  58 +++++++++
 .../cassandra/service/ActiveRepairService.java  |   6 +-
 .../cassandra/service/StorageService.java       |  49 +++++--
 .../cassandra/service/StorageServiceMBean.java  |  20 ++-
 .../org/apache/cassandra/tools/NodeProbe.java   |  29 +++--
 .../org/apache/cassandra/tools/NodeTool.java    |  14 +-
 .../repair/RequestCoordinatorTest.java          | 124 ++++++++++++++++++
 16 files changed, 499 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c9e35d5,7519653..fa3ce8a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -26,34 -12,7 +26,35 @@@ Merged from 2.0
   * Avoid overlap in L1 when L0 contains many nonoverlapping
     sstables (CASSANDRA-8211)
   * Improve PropertyFileSnitch logging (CASSANDRA-8183)
 - * Abort liveRatio calculation if the memtable is flushed (CASSANDRA-8164)
++ * Add DC-aware sequential repair (CASSANDRA-8193)
 +
 +
 +2.1.2
 + * (cqlsh) parse_for_table_meta errors out on queries with undefined
 +   grammars (CASSANDRA-8262)
 + * (cqlsh) Fix SELECT ... TOKEN() function broken in C* 2.1.1 (CASSANDRA-8258)
 + * Fix Cassandra crash when running on JDK8 update 40 (CASSANDRA-8209)
 + * Optimize partitioner tokens (CASSANDRA-8230)
 + * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
 + * Make cache serializers pluggable (CASSANDRA-8096)
 + * Fix issues with CONTAINS (KEY) queries on secondary indexes
 +   (CASSANDRA-8147)
 + * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
 + * Fix default timestamp in QueryOptions (CASSANDRA-8246)
 + * Set socket timeout when reading remote version (CASSANDRA-8188)
 + * Refactor how we track live size (CASSANDRA-7852)
 + * Make sure unfinished compaction files are removed (CASSANDRA-8124)
 + * Fix shutdown when run as Windows service (CASSANDRA-8136)
 + * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
 + * Fix race in RecoveryManagerTest (CASSANDRA-8176)
 + * Avoid IllegalArgumentException while sorting sstables in
 +   IndexSummaryManager (CASSANDRA-8182)
 + * Shutdown JVM on file descriptor exhaustion (CASSANDRA-7579)
 + * Add 'die' policy for commit log and disk failure (CASSANDRA-7927)
 + * Fix installing as service on Windows (CASSANDRA-8115)
 + * Fix CREATE TABLE for CQL2 (CASSANDRA-8144)
 + * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109)
 +Merged from 2.0:
   * Correctly handle non-text column names in cql3 (CASSANDRA-8178)
   * Fix deletion for indexes on primary key columns (CASSANDRA-8206)
   * Add 'nodetool statusgossip' (CASSANDRA-8125)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 8057ed5,7c791aa..20d5d97
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -73,12 -72,14 +73,14 @@@ public class RepairJo
                       ListeningExecutorService taskExecutor)
      {
          this.listener = listener;
 -        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
 +        this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
-         this.isSequential = isSequential;
+         this.parallelismDegree = parallelismDegree;
          this.taskExecutor = taskExecutor;
-         this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+ 
+         IRequestProcessor<InetAddress> processor = new IRequestProcessor<InetAddress>()
          {
-             public void send(InetAddress endpoint)
+             @Override
+             public void process(InetAddress endpoint)
              {
                  ValidationRequest request = new ValidationRequest(desc, gcBefore);
                  MessagingService.instance().sendOneWay(request.createMessage(), endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 346f3f4,f2b95eb..0580ebb
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -114,20 -110,19 +114,20 @@@ public class RepairSession extends Wrap
       *
       * @param range range to repair
       * @param keyspace name of keyspace
-      * @param isSequential true if performing repair on snapshots sequentially
+      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
 -     * @param dataCenters the data centers that should be part of the repair; null for all DCs
 +     * @param endpoints the data centers that should be part of the repair; null for all DCs
       * @param cfnames names of columnfamilies
       */
-     public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
 -    public RepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
++    public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames)
      {
-         this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames);
 -        this(UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
++        this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, cfnames);
      }
  
-     public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String[] cfnames)
 -    public RepairSession(UUID id, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
++    public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String[] cfnames)
      {
 +        this.parentRepairSession = parentRepairSession;
          this.id = id;
-         this.isSequential = isSequential;
+         this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
          assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@@ -284,10 -270,10 +284,10 @@@
              // Create and queue a RepairJob for each column family
              for (String cfname : cfnames)
              {
-                 RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
 -                RepairJob job = new RepairJob(this, id, keyspace, cfname, range, parallelismDegree, taskExecutor);
++                RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, parallelismDegree, taskExecutor);
                  jobs.offer(job);
              }
 -
 +            logger.debug("Sending tree requests to endpoints {}", endpoints);
              jobs.peek().sendTreeRequests(endpoints);
  
              // block whatever thread started this session until all requests have been returned:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 68c2fae,da81e8f..d43143e
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -116,9 -92,9 +116,9 @@@ public class ActiveRepairServic
       *
       * @return Future for asynchronous call or null if there is no need to repair
       */
-     public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
 -    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
++    public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, String... cfnames)
      {
-         RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames);
 -        RepairSession session = new RepairSession(range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
++        RepairSession session = new RepairSession(parentRepairSession, range, keyspace, parallelismDegree, endpoints, cfnames);
          if (session.endpoints.isEmpty())
              return null;
          RepairFuture futureTask = new RepairFuture(session);
@@@ -152,9 -128,7 +152,9 @@@
      // add it to the sessions (avoid NPE in tests)
      RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
      {
 -        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, null, null, new String[]{desc.columnFamily});
 +        Set<InetAddress> neighbours = new HashSet<>();
 +        neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null));
-         RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily});
++        RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily});
          sessions.put(session.getId(), session);
          RepairFuture futureTask = new RepairFuture(session);
          executor.execute(futureTask);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 79cea8e,3d42d1c..38cca10
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2489,86 -2407,112 +2490,106 @@@ public class StorageService extends Not
          sendNotification(jmxNotification);
      }
  
 -    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
      {
 -        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
++        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
+     }
+ 
 -    public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
++    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)
+     {
 -        // when repairing only primary range, dataCenter nor hosts can be set
 -        if (primaryRange && (dataCenters != null || hosts != null))
 +        Collection<Range<Token>> ranges;
 +        if (primaryRange)
          {
 -            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
 +            // when repairing only primary range, neither dataCenters nor hosts can be set
 +            if (dataCenters == null && hosts == null)
 +                ranges = getPrimaryRanges(keyspace);
 +            // except dataCenters only contain local DC (i.e. -local)
 +            else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
 +                ranges = getPrimaryRangesWithinDC(keyspace);
 +            else
 +                throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
          }
 -        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 -        return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, columnFamilies);
 +        else
 +        {
 +             ranges = getLocalRanges(keyspace);
 +        }
 +
-         return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies);
++        return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, fullRepair, columnFamilies);
      }
  
 -    public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts,  final Collection<Range<Token>> ranges, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
 +    {
++        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, ranges, fullRepair, columnFamilies);
++    }
++
++    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+     {
          if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
              return 0;
  
 -        final int cmd = nextRepairCommand.incrementAndGet();
 +        int cmd = nextRepairCommand.incrementAndGet();
          if (ranges.size() > 0)
          {
-             if (!FBUtilities.isUnix() && isSequential)
 -            new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, columnFamilies)).start();
++            if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
 +            {
 +                logger.warn("Snapshot-based repair is not yet supported on Windows.  Reverting to parallel repair.");
-                 isSequential = false;
++                parallelismDegree = RepairParallelism.PARALLEL;
 +            }
-             new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start();
++            new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)).start();
          }
          return cmd;
      }
  
 -    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)
      {
 -        // when repairing only primary range, you cannot repair only on local DC
 -        if (primaryRange && isLocal)
 +        Collection<Range<Token>> ranges;
 +        if (primaryRange)
          {
 -            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
 +            ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace);
          }
 -        final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
 -        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, columnFamilies);
 -    }
 -
 -    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
 -    {
 -        if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
 -            return 0;
 -
 -        final int cmd = nextRepairCommand.incrementAndGet();
 -        if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
 +        else
          {
 -            logger.warn("Snapshot-based repair is not yet supported on Windows.  Reverting to parallel repair.");
 -            parallelismDegree = RepairParallelism.PARALLEL;
 +            ranges = getLocalRanges(keyspace);
          }
 -        new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, columnFamilies)).start();
 -        return cmd;
 +
 +        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
      }
  
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
 +    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
      {
 -        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, columnFamilies);
++        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, fullRepair, columnFamilies);
+     }
+ 
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
++    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+     {
 -        Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
 -
 -        logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
 -                    repairingRange, keyspaceName, columnFamilies);
 +        if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
 +            return 0;
  
 +        int cmd = nextRepairCommand.incrementAndGet();
-         if (!FBUtilities.isUnix() && isSequential)
+         if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
          {
              logger.warn("Snapshot-based repair is not yet supported on Windows.  Reverting to parallel repair.");
-             isSequential = false;
+             parallelismDegree = RepairParallelism.PARALLEL;
          }
-         new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
 -        return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, columnFamilies);
 -    }
 -
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
 -    {
 -        Set<String> dataCenters = null;
 -        if (isLocal)
 -        {
 -            dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
 -        }
 -        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies);
++        new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, fullRepair, columnFamilies)).start();
 +        return cmd;
      }
  
 -    /**
 -     * Trigger proactive repair for a keyspace and column families.
 -     */
 -    public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 +    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException
      {
 -        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies);
++        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, fullRepair, columnFamilies);
+     }
+ 
 -    public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
++    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies)
+     {
 -        // primary range repair can only be performed for whole cluster.
 -        // NOTE: we should omit the param but keep API as is for now.
 -        if (isLocal)
 -        {
 -            throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
 -        }
 +        Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
  
 -        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false, columnFamilies);
 +        logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
 +                           repairingRange, keyspaceName, columnFamilies);
-         return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
++        return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, fullRepair, columnFamilies);
      }
  
 -    public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
 +    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies)
      {
          Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
  
@@@ -2616,30 -2567,17 +2637,30 @@@
          return repairingRange;
      }
  
 -    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final boolean isLocal, final String... columnFamilies)
 +    private FutureTask<Object> createRepairTask(int cmd,
 +                                                String keyspace,
 +                                                Collection<Range<Token>> ranges,
-                                                 boolean isSequential,
++                                                RepairParallelism parallelismDegree,
 +                                                boolean isLocal,
 +                                                boolean fullRepair,
 +                                                String... columnFamilies)
      {
          Set<String> dataCenters = null;
          if (isLocal)
          {
              dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
          }
-         return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies);
 -        return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, columnFamilies);
++        return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, fullRepair, columnFamilies);
      }
  
 -    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
 +    private FutureTask<Object> createRepairTask(final int cmd,
 +                                                final String keyspace,
 +                                                final Collection<Range<Token>> ranges,
-                                                 final boolean isSequential,
++                                                final RepairParallelism parallelismDegree,
 +                                                final Collection<String> dataCenters,
 +                                                final Collection<String> hosts,
 +                                                final boolean fullRepair,
 +                                                final String... columnFamilies)
      {
          if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
          {
@@@ -2650,71 -2588,24 +2671,71 @@@
          {
              protected void runMayThrow() throws Exception
              {
-                 String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair);
 -                String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
++                String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", cmd, ranges.size(), keyspace, parallelismDegree, fullRepair);
                  logger.info(message);
                  sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
  
-                 if (isSequential && !fullRepair)
 -                List<RepairFuture> futures = new ArrayList<>(ranges.size());
++                if (parallelismDegree != RepairParallelism.PARALLEL && !fullRepair)
 +                {
 +                    message = "It is not possible to mix sequential repair and incremental repairs.";
 +                    logger.error(message);
 +                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
 +                    return;
 +                }
 +
 +                Set<InetAddress> allNeighbors = new HashSet<>();
 +                Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
                  for (Range<Token> range : ranges)
                  {
 -                    RepairFuture future;
                      try
                      {
 -                        future = forceKeyspaceRepair(range, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
 +                        Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
 +                        rangeToNeighbors.put(range, neighbors);
 +                        allNeighbors.addAll(neighbors);
                      }
                      catch (IllegalArgumentException e)
                      {
 -                        logger.error("Repair session failed:", e);
 -                        sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
 -                        continue;
 +                        logger.error("Repair failed:", e);
 +                        sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
 +                        return;
                      }
 +                }
 +
 +                // Validate columnfamilies
 +                List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
 +                try
 +                {
 +                    Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies));
 +                }
 +                catch (IllegalArgumentException e)
 +                {
 +                    sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
 +                    return;
 +                }
 +
 +                UUID parentSession = null;
 +                if (!fullRepair)
 +                {
 +                    try
 +                    {
 +                        parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
 +                        return;
 +                    }
 +                }
 +
 +                List<RepairFuture> futures = new ArrayList<>(ranges.size());
 +                String[] cfnames = new String[columnFamilyStores.size()];
 +                for (int i = 0; i < columnFamilyStores.size(); i++)
 +                {
 +                    cfnames[i] = columnFamilyStores.get(i).name;
 +                }
 +                for (Range<Token> range : ranges)
 +                {
-                     RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, isSequential, rangeToNeighbors.get(range), cfnames);
++                    RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, parallelismDegree, rangeToNeighbors.get(range), cfnames);
                      if (future == null)
                          continue;
                      futures.add(future);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8ae44ff,2386fc8..e7d6f14
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -272,14 -259,30 +274,30 @@@ public interface StorageServiceMBean ex
       *
       * @return Repair command number, or 0 if nothing to repair
       */
 -    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
 +    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts,  boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
  
      /**
+      * Invoke repair asynchronously.
+      * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+      * Notification format is:
+      *   type: "repair"
+      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+      *
+      * @return Repair command number, or 0 if nothing to repair
+      */
 -    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
++    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies);
+ 
+     /**
       * Same as forceRepairAsync, but handles a specified range
       */
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts,  final String... columnFamilies);
 +    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws IOException;
  
      /**
+      * Same as forceRepairAsync, but handles a specified range
+      */
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts,  final String... columnFamilies);
++    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies);
+ 
+     /**
       * Invoke repair asynchronously.
       * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
       * Notification format is:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index d495786,261d416..1d05887
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -244,14 -211,24 +245,19 @@@ public class NodeProbe implements AutoC
          ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies);
      }
  
 -    public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
 -    {
 -        ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
 -    }
 -
 -    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts,  boolean primaryRange, String... columnFamilies) throws IOException
 +    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
      {
 -        forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
++        forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
+     }
+ 
 -    public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts,  boolean primaryRange, String... columnFamilies) throws IOException
++    public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
+     {
          RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
          try
          {
              jmxc.addConnectionNotificationListener(runner, null, null);
              ssProxy.addNotificationListener(runner, null, null);
-             if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange, fullRepair))
 -            if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange))
++            if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange, fullRepair))
                  failed = true;
          }
          catch (Exception e)
@@@ -265,22 -242,22 +271,27 @@@
                  ssProxy.removeNotificationListener(runner);
                  jmxc.removeConnectionNotificationListener(runner);
              }
 -            catch (Throwable ignored) {}
 +            catch (Throwable t)
 +            {
 +                JVMStabilityInspector.inspectThrowable(t);
 +                out.println("Exception occurred during clean-up. " + t);
 +            }
          }
      }
 -    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
 +
 +    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
      {
 -        forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, columnFamilies);
++        forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, fullRepair, columnFamilies);
+     }
+ 
 -    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
++    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
+     {
          RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
          try
          {
              jmxc.addConnectionNotificationListener(runner, null, null);
              ssProxy.addNotificationListener(runner, null, null);
-             if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, hosts, startToken, endToken, fullRepair))
 -            if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken))
++            if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken, fullRepair))
                  failed = true;
          }
          catch (Exception e)
@@@ -1287,16 -1070,16 +1298,16 @@@ class RepairRunner implements Notificat
          this.columnFamilies = columnFamilies;
      }
  
-     public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception
 -    public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
++    public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception
      {
-         cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
 -        cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, columnFamilies);
++        cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
          waitForRepair();
          return success;
      }
  
-     public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception
 -    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception
++    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception
      {
-         cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, fullRepair, columnFamilies);
 -        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
++        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies);
          waitForRepair();
          return success;
      }