You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC

[7/8] git commit: Merge branch 'cassandra-1.1' into cassandra-1.2

Merge branch 'cassandra-1.1' into cassandra-1.2

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
	src/java/org/apache/cassandra/service/StorageService.java
	src/java/org/apache/cassandra/service/StorageServiceMBean.java
	src/java/org/apache/cassandra/tools/NodeCmd.java
	src/java/org/apache/cassandra/tools/NodeProbe.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a96a8d41
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a96a8d41
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a96a8d41

Branch: refs/heads/cassandra-1.2
Commit: a96a8d41e3c3b902e9485c3c1067604dccf4b6fb
Parents: 9458530 0906b7c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 8 14:28:23 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 8 14:28:23 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 +-
 .../io/compress/CompressedRandomAccessReader.java  |   74 +++------
 .../cassandra/service/AntiEntropyService.java      |   10 +
 .../apache/cassandra/service/StorageService.java   |  137 ++++++++++-----
 .../cassandra/service/StorageServiceMBean.java     |   16 ++-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   28 ++--
 src/java/org/apache/cassandra/tools/NodeProbe.java |   73 ++++++++
 7 files changed, 229 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 16473ec,5e87435..37a47f3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,182 -1,34 +1,184 @@@
 -1.1.9
 +1.2.1
 + * disallow bloom filter false positive chance of 0 (CASSANDRA-5013)
 + * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and 
 +   CompactionManagerMBean (CASSANDRA-5044)
-  * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
 + * fix hinting for dropped local writes (CASSANDRA-4753)
 + * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
 + * apply disk_failure_policy to bad disks on initial directory creation 
 +   (CASSANDRA-4847)
 + * Optimize name-based queries to use ArrayBackedSortedColumns (CASSANDRA-5043)
 + * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041)
 + * pool [Compressed]RandomAccessReader objects on the partitioned read path
 +   (CASSANDRA-4942)
 + * Add debug logging to list filenames processed by Directories.migrateFile 
 +   method (CASSANDRA-4939)
 + * Expose black-listed directories via JMX (CASSANDRA-4848)
 + * Log compaction merge counts (CASSANDRA-4894)
 + * Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090)
 + * Add SSL support for the binary protocol (CASSANDRA-5031)
 + * Allow non-schema system ks modification for shuffle to work (CASSANDRA-5097)
 + * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
 + * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
 + * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
 + * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
 + * Validate correctly selects on composite partition key (CASSANDRA-5122)
 + * Fix exception when adding collection (CASSANDRA-5117)
 + * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
 +Merged from 1.1:
+  * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
+  * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
 - * fix multithreaded compaction deadlock (CASSANDRA-4492)
 - * fix specifying and altering crc_check_chance (CASSANDRA-5053)
 - * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
   * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
+  * nodetool repair command now prints progress (CASSANDRA-4767)
  
  
 -1.1.8
 - * reset getRangeSlice filter after finishing a row for get_paged_slice
 -   (CASSANDRA-4919)
 +1.2.0
 + * Disallow counters in collections (CASSANDRA-5082)
 + * cqlsh: add unit tests (CASSANDRA-3920)
 + * fix default bloom_filter_fp_chance for LeveledCompactionStrategy (CASSANDRA-5093)
 +
 +
 +1.2.0-rc2
 + * fix nodetool ownership display with vnodes (CASSANDRA-5065)
 + * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
 + * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
 + * Fix SimpleAuthorizer example (CASSANDRA-5072)
 + * cqlsh: force CL.ONE for tracing and system.schema* queries (CASSANDRA-5070)
 + * Includes cassandra-shuffle in the debian package (CASSANDRA-5058)
 +Merged from 1.1:
 + * fix multithreaded compaction deadlock (CASSANDRA-4492)
   * fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
 + * Fix ALTER TABLE overriding compression options with defaults
 +   (CASSANDRA-4996, 5066)
 + * fix specifying and altering crc_check_chance (CASSANDRA-5053)
 + * fix Murmur3Partitioner ownership% calculation (CASSANDRA-5076)
 + * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
 +
 +
 +1.2-rc1
 + * rename rpc_timeout settings to request_timeout (CASSANDRA-5027)
 + * add BF with 0.1 FP to LCS by default (CASSANDRA-5029)
 + * Fix preparing insert queries (CASSANDRA-5016)
 + * Fix preparing queries with counter increment (CASSANDRA-5022)
 + * Fix preparing updates with collections (CASSANDRA-5017)
 + * Don't generate UUID based on other node address (CASSANDRA-5002)
 + * Fix message when trying to alter a clustering key type (CASSANDRA-5012)
 + * Update IAuthenticator to match the new IAuthorizer (CASSANDRA-5003)
 + * Fix inserting only a key in CQL3 (CASSANDRA-5040)
 + * Fix CQL3 token() function when used with strings (CASSANDRA-5050)
 +Merged from 1.1:
   * reduce log spam from invalid counter shards (CASSANDRA-5026)
   * Improve schema propagation performance (CASSANDRA-5025)
 - * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
 - * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
 + * Fix for IndexHelper.IndexFor throws OOB Exception (CASSANDRA-5030)
 + * cqlsh: make it possible to describe thrift CFs (CASSANDRA-4827)
   * cqlsh: fix timestamp formatting on some platforms (CASSANDRA-5046)
 - * Fix ALTER TABLE overriding compression options with defaults (CASSANDRA-4996, 5066)
 - * Avoid error opening data file on startup (CASSANDRA-4984)
 - * Fix wrong index_options in cli 'show schema' (CASSANDRA-5008)
 - * Allow overriding number of available processor (CASSANDRA-4790)
  
  
 -1.1.7
 - * cqlsh: improve COPY FROM performance (CASSANDRA-4921)
 +1.2-beta3
 + * make consistency level configurable in cqlsh (CASSANDRA-4829)
 + * fix cqlsh rendering of blob fields (CASSANDRA-4970)
 + * fix cqlsh DESCRIBE command (CASSANDRA-4913)
 + * save truncation position in system table (CASSANDRA-4906)
 + * Move CompressionMetadata off-heap (CASSANDRA-4937)
 + * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
 + * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
 + * acquire references to overlapping sstables during compaction so bloom filter
 +   doesn't get free'd prematurely (CASSANDRA-4934)
 + * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)
 + * Separate tracing from Log4J (CASSANDRA-4861)
 + * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
 + * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
 + * Optimize mostRecentTombstone check in CC.collectAllData (CASSANDRA-4883)
 + * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
 + * Use Stats.db when bulk loading if present (CASSANDRA-4957)
 + * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
 + * (cql3) Remove arbitrary SELECT limit (CASSANDRA-4918)
 + * Correctly handle prepared operation on collections (CASSANDRA-4945)
 + * Fix CQL3 LIMIT (CASSANDRA-4877)
 + * Fix Stress for CQL3 (CASSANDRA-4979)
 + * Remove cassandra specific exceptions from JMX interface (CASSANDRA-4893)
 + * (CQL3) Force using ALLOW FILTERING on potentially inefficient queries (CASSANDRA-4915)
 + * (cql3) Fix adding column when the table has collections (CASSANDRA-4982)
 + * (cql3) Fix allowing collections with compact storage (CASSANDRA-4990)
 + * (cql3) Refuse ttl/writetime function on collections (CASSANDRA-4992)
 + * Replace IAuthority with new IAuthorizer (CASSANDRA-4874)
 + * clqsh: fix KEY pseudocolumn escaping when describing Thrift tables
 +   in CQL3 mode (CASSANDRA-4955)
   * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
   * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
 +Merged from 1.1:
 + * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803)
 + * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
 + * Fix cqlsh timestamp formatting of timezone info (CASSANDRA-4746)
 + * Fix assertion failure with leveled compaction (CASSANDRA-4799)
 + * Check for null end_token in get_range_slice (CASSANDRA-4804)
 + * Remove all remnants of removed nodes (CASSANDRA-4840)
 + * Add aut-reloading of the log4j file in debian package (CASSANDRA-4855)
 + * Fix estimated row cache entry size (CASSANDRA-4860)
 + * reset getRangeSlice filter after finishing a row for get_paged_slice
 +   (CASSANDRA-4919)
   * expunge row cache post-truncate (CASSANDRA-4940)
 - * remove IAuthority2 (CASSANDRA-4875)
 + * Allow static CF definition with compact storage (CASSANDRA-4910)
 + * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880)
 + * Fix 'wrong class type' assertion in CounterColumn (CASSANDRA-4976)
 +
 +
 +1.2-beta2
 + * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876)
 + * off-heap bloom filters for row keys (CASSANDRA_4865)
 + * add extension point for sstable components (CASSANDRA-4049)
 + * improve tracing output (CASSANDRA-4852, 4862)
 + * make TRACE verb droppable (CASSANDRA-4672)
 + * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
 + * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)
 + * Make hint delivery asynchronous (CASSANDRA-4761)
 + * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)
 + * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661)
 + * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
 + * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
 + * include metadata for system keyspace itself in schema tables (CASSANDRA-4416)
 + * add check to PropertyFileSnitch to verify presence of location for
 +   local node (CASSANDRA-4728)
 + * add PBSPredictor consistency modeler (CASSANDRA-4261)
 + * remove vestiges of Thrift unframed mode (CASSANDRA-4729)
 + * optimize single-row PK lookups (CASSANDRA-4710)
 + * adjust blockFor calculation to account for pending ranges due to node 
 +   movement (CASSANDRA-833)
 + * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
 + * (CQL3) Make prepared statement global instead of per connection 
 +   (CASSANDRA-4449)
 + * Fix scrubbing of CQL3 created tables (CASSANDRA-4685)
 + * (CQL3) Fix validation when using counter and regular columns in the same 
 +   table (CASSANDRA-4706)
 + * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
 + * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738)
 + * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
 + * Support repairing only the local DC nodes (CASSANDRA-4747)
 + * Use rpc_address for binary protocol and change default port (CASSANRA-4751)
 + * Fix use of collections in prepared statements (CASSANDRA-4739)
 + * Store more information into peers table (CASSANDRA-4351, 4814)
 + * Configurable bucket size for size tiered compaction (CASSANDRA-4704)
 + * Run leveled compaction in parallel (CASSANDRA-4310)
 + * Fix potential NPE during CFS reload (CASSANDRA-4786)
 + * Composite indexes may miss results (CASSANDRA-4796)
 + * Move consistency level to the protocol level (CASSANDRA-4734, 4824)
 + * Fix Subcolumn slice ends not respected (CASSANDRA-4826)
 + * Fix Assertion error in cql3 select (CASSANDRA-4783)
 + * Fix list prepend logic (CQL3) (CASSANDRA-4835)
 + * Add booleans as literals in CQL3 (CASSANDRA-4776)
 + * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
 + * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
 + * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
 + * Remove system tables accounting from schema (CASSANDRA-4850)
 + * (cql3) Force provided columns in clustering key order in 
 +   'CLUSTERING ORDER BY' (CASSANDRA-4881)
 + * Fix composite index bug (CASSANDRA-4884)
 + * Fix short read protection for CQL3 (CASSANDRA-4882)
 + * Add tracing support to the binary protocol (CASSANDRA-4699)
 + * (cql3) Don't allow prepared marker inside collections (CASSANDRA-4890)
 + * Re-allow order by on non-selected columns (CASSANDRA-4645)
 + * Bug when composite index is created in a table having collections (CASSANDRA-4909)
 + * log index scan subject in CompositesSearcher (CASSANDRA-4904)
 +Merged from 1.1:
   * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
   * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
   * fix indexing empty column values (CASSANDRA-4832)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index dc6b0df,a5faff1..bbd2466
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -35,31 -26,25 +32,34 @@@ import org.apache.cassandra.io.util.Poo
  import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.utils.FBUtilities;
  
- // TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer"
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 -
+ /**
+  * CRAR extends RAR to transparently uncompress blocks from the file into RAR.buffer.  Most of the RAR
+  * "read bytes from the buffer, rebuffering when necessary" machinery works unchanged after that.
+  */
  public class CompressedRandomAccessReader extends RandomAccessReader
  {
 -    private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class);
 -
 -    public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata) throws IOException
 +    public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedSegmentedFile owner)
      {
 -        return open(dataFilePath, metadata, false);
 +        try
 +        {
 +            return new CompressedRandomAccessReader(path, metadata, false, owner);
 +        }
 +        catch (FileNotFoundException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
      }
  
 -    public static RandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
 +    public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache)
      {
 -        return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache);
 +        try
 +        {
 +            return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null);
 +        }
 +        catch (FileNotFoundException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
      }
  
      private final CompressionMetadata metadata;
@@@ -70,30 -56,13 +71,13 @@@
      private final Checksum checksum = new CRC32();
  
      // raw checksum bytes
-     private final byte[] checksumBytes = new byte[4];
- 
-     private final FileInputStream source;
-     private final FileChannel channel;
+     private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
  
 -    public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
 +    private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
      {
 -        super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
 +        super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner);
          this.metadata = metadata;
-         compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())];
-         // can't use super.read(...) methods
-         // that is why we are allocating special InputStream to read data from disk
-         // from already open file descriptor
-         try
-         {
-             source = new FileInputStream(getFD());
-         }
-         catch (IOException e)
-         {
-             // fd == null, Not Supposed To Happen
-             throw new RuntimeException(e);
-         }
- 
-         channel = source.getChannel(); // for position manipulation
+         compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
      }
  
      @Override
@@@ -118,20 -76,18 +102,26 @@@
          if (channel.position() != chunk.offset)
              channel.position(chunk.offset);
  
-         if (compressed.length < chunk.length)
-             compressed = new byte[chunk.length];
+         if (compressed.capacity() < chunk.length)
+             compressed = ByteBuffer.wrap(new byte[chunk.length]);
+         else
+             compressed.clear();
+         compressed.limit(chunk.length);
  
-         if (source.read(compressed, 0, chunk.length) != chunk.length)
+         if (channel.read(compressed) != chunk.length)
 -            throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset));
 +            throw new CorruptBlockException(getPath(), chunk);
 +
+         // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
+         // in the future this will save a lot of hair-pulling
+         compressed.flip();
 -        validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
 +        try
 +        {
-             validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
++            validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new CorruptBlockException(getPath(), chunk);
 +        }
  
          if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
          {
@@@ -151,11 -107,13 +141,10 @@@
      private int checksum(CompressionMetadata.Chunk chunk) throws IOException
      {
          assert channel.position() == chunk.offset + chunk.length;
- 
-         if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length)
+         checksumBytes.clear();
+         if (channel.read(checksumBytes) != checksumBytes.capacity())
 -            throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.",
 -                                                getPath(),
 -                                                chunk.offset,
 -                                                chunk.length));
 +            throw new CorruptBlockException(getPath(), chunk);
- 
-         return Ints.fromByteArray(checksumBytes);
+         return checksumBytes.getInt(0);
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b9e4c21,ad05ce2..15339c4
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -26,12 -26,20 +26,15 @@@ import java.net.InetAddress
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
  import java.util.*;
 -import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
  import javax.management.MBeanServer;
+ import javax.management.Notification;
+ import javax.management.NotificationBroadcasterSupport;
  import javax.management.ObjectName;
  
 -import com.google.common.base.Supplier;
  import com.google.common.collect.*;
 -
 -import org.apache.cassandra.metrics.ClientRequestMetrics;
 -
  import org.apache.log4j.Level;
  import org.apache.commons.lang.StringUtils;
  import org.slf4j.Logger;
@@@ -79,12 -81,90 +82,15 @@@ import static com.google.common.base.Ch
   * This class will also maintain histograms of the load information
   * of other nodes in the cluster.
   */
- public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean
+ public class StorageService extends NotificationBroadcasterSupport implements IEndpointStateChangeSubscriber, StorageServiceMBean
  {
 -    private static Logger logger_ = LoggerFactory.getLogger(StorageService.class);
 +    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
  
      public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
  
+     /* JMX notification serial number counter */
+     private final AtomicLong notificationSerialNumber = new AtomicLong();
+ 
 -    /* All verb handler identifiers */
 -    public enum Verb
 -    {
 -        MUTATION,
 -        BINARY, // Deprecated
 -        READ_REPAIR,
 -        READ,
 -        REQUEST_RESPONSE, // client-initiated reads and writes
 -        STREAM_INITIATE, // Deprecated
 -        STREAM_INITIATE_DONE, // Deprecated
 -        STREAM_REPLY,
 -        STREAM_REQUEST,
 -        RANGE_SLICE,
 -        BOOTSTRAP_TOKEN,
 -        TREE_REQUEST,
 -        TREE_RESPONSE,
 -        JOIN, // Deprecated
 -        GOSSIP_DIGEST_SYN,
 -        GOSSIP_DIGEST_ACK,
 -        GOSSIP_DIGEST_ACK2,
 -        DEFINITIONS_ANNOUNCE, // Deprecated
 -        DEFINITIONS_UPDATE,
 -        TRUNCATE,
 -        SCHEMA_CHECK,
 -        INDEX_SCAN, // Deprecated
 -        REPLICATION_FINISHED,
 -        INTERNAL_RESPONSE, // responses to internal calls
 -        COUNTER_MUTATION,
 -        STREAMING_REPAIR_REQUEST,
 -        STREAMING_REPAIR_RESPONSE,
 -        SNAPSHOT, // Similar to nt snapshot
 -        MIGRATION_REQUEST,
 -        GOSSIP_SHUTDOWN,
 -        // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
 -        UNUSED_1,
 -        UNUSED_2,
 -        UNUSED_3,
 -        ;
 -        // remember to add new verbs at the end, since we serialize by ordinal
 -    }
 -    public static final Verb[] VERBS = Verb.values();
 -
 -    public static final EnumMap<StorageService.Verb, Stage> verbStages = new EnumMap<StorageService.Verb, Stage>(StorageService.Verb.class)
 -    {{
 -        put(Verb.MUTATION, Stage.MUTATION);
 -        put(Verb.BINARY, Stage.MUTATION);
 -        put(Verb.READ_REPAIR, Stage.MUTATION);
 -        put(Verb.TRUNCATE, Stage.MUTATION);
 -        put(Verb.READ, Stage.READ);
 -        put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
 -        put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've just copied old behavior here
 -        put(Verb.STREAM_REQUEST, Stage.STREAM);
 -        put(Verb.RANGE_SLICE, Stage.READ);
 -        put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
 -        put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
 -        put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
 -        put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
 -        put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
 -        put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
 -        put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
 -        put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
 -        put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP);
 -        put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION);
 -        put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
 -        put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
 -        put(Verb.INDEX_SCAN, Stage.READ);
 -        put(Verb.REPLICATION_FINISHED, Stage.MISC);
 -        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
 -        put(Verb.COUNTER_MUTATION, Stage.MUTATION);
 -        put(Verb.SNAPSHOT, Stage.MISC);
 -        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
 -        put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
 -        put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
 -    }};
 -
      private static int getRingDelay()
      {
          String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@@ -180,10 -250,8 +186,12 @@@
  
      private static final AtomicInteger nextRepairCommand = new AtomicInteger();
  
 +    private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
 +
 +    private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
 +
+     private final ObjectName jmxObjectName;
+ 
      public void finishBootstrapping()
      {
          isBootstrapMode = false;
@@@ -2225,6 -1944,82 +2234,34 @@@
      }
  
      /**
+      * Sends JMX notification to subscribers.
+      *
+      * @param type Message type
+      * @param message Message itself
+      * @param userObject Arbitrary object to attach to notification
+      */
+     public void sendNotification(String type, String message, Object userObject)
+     {
+         Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
+         jmxNotification.setUserData(userObject);
+         sendNotification(jmxNotification);
+     }
+ 
 -    public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies)
++    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+     {
 -        if (Table.SYSTEM_TABLE.equals(tableName))
++        if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace) || Auth.AUTH_KS.equals(keyspace))
+             return 0;
+ 
+         final int cmd = nextRepairCommand.incrementAndGet();
 -        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
++        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace);
+         if (ranges.size() > 0)
+         {
 -            new Thread(new WrappedRunnable()
 -            {
 -                protected void runMayThrow() throws Exception
 -                {
 -                    String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName);
 -                    logger_.info(message);
 -                    sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
 -
 -                    List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
 -                    for (Range<Token> range : ranges)
 -                    {
 -                        AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
 -                        if (future == null)
 -                            continue;
 -                        futures.add(future);
 -                        // wait for a session to be done with its differencing before starting the next one
 -                        try
 -                        {
 -                            future.session.differencingDone.await();
 -                        }
 -                        catch (InterruptedException e)
 -                        {
 -                            message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
 -                            logger_.error(message, e);
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                    }
 -                    for (AntiEntropyService.RepairFuture future : futures)
 -                    {
 -                        try
 -                        {
 -                            future.get();
 -                            message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
 -                        }
 -                        catch (ExecutionException e)
 -                        {
 -                            message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                        catch (Exception e)
 -                        {
 -                            message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                    }
 -                    sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
 -                }
 -            }).start();
++            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
+         }
+         return cmd;
+     }
+ 
+     /**
       * Trigger proactive repair for a table and column families.
       * @param tableName
       * @param columnFamilies
@@@ -2245,65 -2098,25 +2282,75 @@@
          Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
          Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
  
 -        logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
 -                     new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
 -        AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies);
 -        if (future == null)
 +        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
 +                    parsedBeginToken, parsedEndToken, tableName, columnFamilies);
 +        forceTableRepairRange(tableName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
 +    }
 +
 +    public void forceTableRepairRange(final String tableName, final Collection<Range<Token>> ranges, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
 +    {
-         if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName))
++        if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName) || Auth.AUTH_KS.equals(tableName))
              return;
 -        try
 -        {
 -            future.get();
 -        }
 -        catch (Exception e)
++        createRepairTask(nextRepairCommand.incrementAndGet(), tableName, ranges, isSequential, isLocal, columnFamilies).run();
++    }
 +
-         int cmd = nextRepairCommand.incrementAndGet();
-         logger.info("Starting repair command #{}, repairing {} ranges.", cmd, ranges.size());
- 
-         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
-         for (Range<Token> range : ranges)
++    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
++    {
++        FutureTask<Object> task = new FutureTask<Object>(new WrappedRunnable()
          {
-             AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, isLocal, columnFamilies);
-             if (future == null)
-                 continue;
-             futures.add(future);
-             // wait for a session to be done with its differencing before starting the next one
-             try
-             {
-                 future.session.differencingDone.await();
-             }
-             catch (InterruptedException e)
 -            logger_.error("Repair session " + future.session.getName() + " failed.", e);
 -        }
++            protected void runMayThrow() throws Exception
 +            {
-                 logger.error("Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.", e);
-             }
-         }
-         if (futures.isEmpty())
-         {
-             logger.info("Nothing to repair on {} for command #{}", tableName, cmd);
-             return;
-         }
++                String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
++                logger.info(message);
++                sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
 +
-         boolean failedSession = false;
- 
-         // block until all repair sessions have completed
-         for (AntiEntropyService.RepairFuture future : futures)
-         {
-             try
-             {
-                 future.get();
-             }
-             catch (Exception e)
-             {
-                 logger.error("Repair session " + future.session.getName() + " failed.", e);
-                 failedSession = true;
++                List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
++                for (Range<Token> range : ranges)
++                {
++                    AntiEntropyService.RepairFuture future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies);
++                    if (future == null)
++                        continue;
++                    futures.add(future);
++                    // wait for a session to be done with its differencing before starting the next one
++                    try
++                    {
++                        future.session.differencingDone.await();
++                    }
++                    catch (InterruptedException e)
++                    {
++                        message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
++                        logger.error(message, e);
++                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++                    }
++                }
++                for (AntiEntropyService.RepairFuture future : futures)
++                {
++                    try
++                    {
++                        future.get();
++                        message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
++                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
++                    }
++                    catch (ExecutionException e)
++                    {
++                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
++                        logger.error(message, e);
++                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++                    }
++                    catch (Exception e)
++                    {
++                        message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
++                        logger.error(message, e);
++                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++                    }
++                }
++                sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
 +            }
-         }
- 
-         if (failedSession)
-             throw new IOException("Repair command #" + cmd + ": some repair session(s) failed (see log for details).");
-         else
-             logger.info("Repair command #{} completed successfully", cmd);
++        }, null);
++        return task;
      }
  
 -    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
 +    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
      {
          ArrayList<String> names = new ArrayList<String>();
          for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0176b1d,c34faf3..067d08a
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -27,7 -26,14 +27,9 @@@ import java.util.Map
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeoutException;
  
- public interface StorageServiceMBean
+ import javax.management.NotificationEmitter;
+ 
 -import org.apache.cassandra.config.ConfigurationException;
 -import org.apache.cassandra.thrift.InvalidRequestException;
 -import org.apache.cassandra.thrift.UnavailableException;
 -
 -
+ public interface StorageServiceMBean extends NotificationEmitter
  {
      /**
       * Retrieve the list of live nodes in the cluster, where "liveness" is
@@@ -252,6 -244,18 +254,18 @@@
      public void forceTableFlush(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
  
      /**
+      * Invoke repair asynchronously.
+      * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+      * Notification format is:
+      *   type: "repair"
+      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+      *
+      * @return Repair command number, or 0 if nothing to repair
 -     * @see #forceTableRepair(String, boolean, String...)
++     * @see #forceTableRepair(String, boolean, boolean, String...)
+      */
 -    public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies);
++    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
+ 
+     /**
       * Triggers proactive repair for given column families, or all columnfamilies for the given table
       * if none are explicitly listed.
       * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeCmd.java
index ecd9e3c,8d4f9a1..d72f3d3
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@@ -25,35 -28,23 +25,33 @@@ import java.net.ConnectException
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.text.DecimalFormat;
- import java.util.concurrent.ExecutionException;
- import java.util.Map.Entry;
  import java.util.*;
+ import java.util.Map.Entry;
+ import java.util.concurrent.ExecutionException;
  
 +import com.google.common.collect.LinkedHashMultimap;
 +import com.google.common.collect.Maps;
  import org.apache.commons.cli.*;
- 
- import org.apache.cassandra.service.CacheServiceMBean;
- import org.apache.cassandra.service.PBSPredictionResult;
- import org.apache.cassandra.service.PBSPredictorMBean;
- import org.apache.cassandra.service.StorageProxyMBean;
++import org.yaml.snakeyaml.Loader;
++import org.yaml.snakeyaml.TypeDescription;
++import org.yaml.snakeyaml.Yaml;
++import org.yaml.snakeyaml.constructor.Constructor;
  
  import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 -import org.apache.cassandra.config.ConfigurationException;
  import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 +import org.apache.cassandra.db.Table;
  import org.apache.cassandra.db.compaction.CompactionManagerMBean;
  import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
  import org.apache.cassandra.net.MessagingServiceMBean;
+ import org.apache.cassandra.service.CacheServiceMBean;
++import org.apache.cassandra.service.PBSPredictionResult;
++import org.apache.cassandra.service.PBSPredictorMBean;
+ import org.apache.cassandra.service.StorageProxyMBean;
 -import org.apache.cassandra.thrift.InvalidRequestException;
  import org.apache.cassandra.utils.EstimatedHistogram;
  import org.apache.cassandra.utils.Pair;
- import org.yaml.snakeyaml.Loader;
- import org.yaml.snakeyaml.TypeDescription;
- import org.yaml.snakeyaml.Yaml;
- import org.yaml.snakeyaml.constructor.Constructor;
  
  public class NodeCmd
  {
@@@ -1334,11 -1040,8 +1332,9 @@@
              {
                  case REPAIR  :
                      boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
 +                    boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
-                     if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
-                         probe.forceTableRepairPrimaryRange(keyspace, snapshot, localDC, columnFamilies);
-                     else
-                         probe.forceTableRepair(keyspace, snapshot, localDC, columnFamilies);
+                     boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
 -                    probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies);
++                    probe.forceRepairAsync(System.out, keyspace, snapshot, localDC, primaryRange, columnFamilies);
                      break;
                  case FLUSH   :
                      try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a96a8d41/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 261374f,264ea90..021a40f
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -48,6 -52,9 +50,7 @@@ import org.apache.cassandra.net.Messagi
  import org.apache.cassandra.service.*;
  import org.apache.cassandra.streaming.StreamingService;
  import org.apache.cassandra.streaming.StreamingServiceMBean;
 -import org.apache.cassandra.thrift.InvalidRequestException;
 -import org.apache.cassandra.thrift.UnavailableException;
+ import org.apache.cassandra.utils.SimpleCondition;
  
  /**
   * JMX client operations for Cassandra.
@@@ -198,19 -202,41 +201,41 @@@ public class NodeProb
          ssProxy.forceTableFlush(tableName, columnFamilies);
      }
  
 -    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException
 +    public void forceTableRepair(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
      {
 -        ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
 +        ssProxy.forceTableRepair(tableName, isSequential, isLocal, columnFamilies);
      }
  
 -    public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies) throws IOException
++    public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException
+     {
+         RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+         try
+         {
+             ssProxy.addNotificationListener(runner, null, null);
 -            runner.repairAndWait(ssProxy, isSequential, primaryRange);
++            runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange);
+         }
+         catch (Exception e)
+         {
+             throw new IOException(e) ;
+         }
+         finally
+         {
+             try
+             {
+                ssProxy.removeNotificationListener(runner);
+             }
+             catch (ListenerNotFoundException ignored) {}
+         }
+     }
+ 
 -    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
 +    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
      {
 -        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
 +        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies);
      }
  
 -    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException
 +    public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
      {
 -        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
 +        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal, columnFamilies);
      }
  
      public void invalidateKeyCache() throws IOException
@@@ -797,3 -795,52 +822,51 @@@ class ThreadPoolProxyMBeanIterator impl
          throw new UnsupportedOperationException();
      }
  }
+ 
+ class RepairRunner implements NotificationListener
+ {
+     private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+     private final Condition condition = new SimpleCondition();
+     private final PrintStream out;
+     private final String keyspace;
+     private final String[] columnFamilies;
+     private int cmd;
+ 
+     RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
+     {
+         this.out = out;
+         this.keyspace = keyspace;
+         this.columnFamilies = columnFamilies;
+     }
+ 
 -    public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean primaryRangeOnly) throws InterruptedException
++    public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws InterruptedException
+     {
 -        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, primaryRangeOnly, columnFamilies);
++        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies);
+         if (cmd > 0)
+         {
+             condition.await();
+         }
+         else
+         {
+             String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+             out.println(message);
+         }
+     }
+ 
+     public void handleNotification(Notification notification, Object handback)
+     {
+         if ("repair".equals(notification.getType()))
+         {
 -            // repair status is int array with [0] = cmd number, [1] = status
+             int[] status = (int[]) notification.getUserData();
+             assert status.length == 2;
 -            // we only output what we invoked
+             if (cmd == status[0])
+             {
+                 String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
+                 out.println(message);
++                // repair status is int array with [0] = cmd number, [1] = status
+                 if (status[1] == AntiEntropyService.Status.FINISHED.ordinal())
+                     condition.signalAll();
+             }
+         }
+     }
+ }