You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/03/10 20:18:46 UTC

cassandra git commit: Generalize progress reporting

Repository: cassandra
Updated Branches:
  refs/heads/trunk d76adf500 -> 4adb98146


Generalize progress reporting

patch by yukim; reviewed by Josh McKenzie for CASSANDRA-8901


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4adb9814
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4adb9814
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4adb9814

Branch: refs/heads/trunk
Commit: 4adb9814639d6e4225105e0c364044fac8b4b84d
Parents: d76adf5
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Mar 10 14:17:31 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Mar 10 14:17:31 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/net/OutboundTcpConnection.java    |   2 +-
 .../apache/cassandra/repair/RepairRunnable.java | 394 +++++++++++++++++++
 .../cassandra/service/ActiveRepairService.java  |   6 -
 .../cassandra/service/StorageService.java       | 282 +------------
 .../org/apache/cassandra/tools/NodeProbe.java   |   2 -
 .../apache/cassandra/tools/RepairRunner.java    |  99 +++--
 .../apache/cassandra/tracing/TraceState.java    |  45 ++-
 .../cassandra/utils/progress/ProgressEvent.java |  75 ++++
 .../utils/progress/ProgressEventNotifier.java   |  38 ++
 .../utils/progress/ProgressEventType.java       |  72 ++++
 .../utils/progress/ProgressListener.java        |  32 ++
 .../jmx/JMXNotificationProgressListener.java    |  94 +++++
 .../utils/progress/jmx/JMXProgressSupport.java  |  58 +++
 14 files changed, 865 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 29e1edf..2de6137 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,7 @@
  * Improve concurrency of repair (CASSANDRA-6455, 8208)
  * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
  * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
+ * Generalize progress reporting (CASSANDRA-8901)
 
 2.1.4
  * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 5130974..6db83b4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -217,7 +217,7 @@ public class OutboundTcpConnection extends Thread
                 {
                     byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE);
                     Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL(), null);
+                    TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
new file mode 100644
index 0000000..89c0d70
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+{
+    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
+
+    private StorageService storageService;
+    private final int cmd;
+    private final RepairOption options;
+    private final String keyspace;
+
+    private final List<ProgressListener> listeners = new ArrayList<>();
+
+    public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
+    {
+        this.storageService = storageService;
+        this.cmd = cmd;
+        this.options = options;
+        this.keyspace = keyspace;
+    }
+
+    @Override
+    public void addProgressListener(ProgressListener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeProgressListener(ProgressListener listener)
+    {
+        listeners.remove(listener);
+    }
+
+    protected void fireProgressEvent(String tag, ProgressEvent event)
+    {
+        for (ProgressListener listener : listeners)
+        {
+            listener.progress(tag, event);
+        }
+    }
+
+    protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
+    {
+        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
+        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+    }
+
+    protected void runMayThrow() throws Exception
+    {
+        final TraceState traceState;
+
+        final String tag = "repair:" + cmd;
+
+        final AtomicInteger progress = new AtomicInteger();
+        final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
+
+        String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+        Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
+                                                                                                columnFamilies);
+
+        final long startTime = System.currentTimeMillis();
+        String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
+                                       options);
+        logger.info(message);
+        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
+        if (options.isTraced())
+        {
+            StringBuilder cfsb = new StringBuilder();
+            for (ColumnFamilyStore cfs : validColumnFamilies)
+                cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+            traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+                                                                          cfsb.substring(2)));
+            Tracing.traceRepair(message);
+            traceState.enableActivityNotification(tag);
+            for (ProgressListener listener : listeners)
+                traceState.addProgressListener(listener);
+            Thread queryThread = createQueryThread(cmd, sessionId);
+            queryThread.setName("RepairTracePolling");
+            queryThread.start();
+        }
+        else
+        {
+            traceState = null;
+        }
+
+        final Set<InetAddress> allNeighbors = new HashSet<>();
+        Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+        try
+        {
+            for (Range<Token> range : options.getRanges())
+            {
+                    Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
+                                                                                  options.getDataCenters(),
+                                                                                  options.getHosts());
+                    rangeToNeighbors.put(range, neighbors);
+                    allNeighbors.addAll(neighbors);
+            }
+            progress.incrementAndGet();
+        }
+        catch (IllegalArgumentException e)
+        {
+            logger.error("Repair failed:", e);
+            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+            return;
+        }
+
+        // Validate columnfamilies
+        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+        try
+        {
+            Iterables.addAll(columnFamilyStores, validColumnFamilies);
+            progress.incrementAndGet();
+        }
+        catch (IllegalArgumentException e)
+        {
+            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+            return;
+        }
+
+        final UUID parentSession;
+        long repairedAt;
+        try
+        {
+            parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores);
+            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
+            progress.incrementAndGet();
+        }
+        catch (Throwable t)
+        {
+            fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
+            return;
+        }
+
+        // Set up RepairJob executor for this repair command.
+        final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+                                                                                                                         Integer.MAX_VALUE,
+                                                                                                                         TimeUnit.SECONDS,
+                                                                                                                         new LinkedBlockingQueue<Runnable>(),
+                                                                                                                         new NamedThreadFactory("Repair#" + cmd),
+                                                                                                                         "internal"));
+
+        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+        String[] cfnames = new String[columnFamilyStores.size()];
+        for (int i = 0; i < columnFamilyStores.size(); i++)
+        {
+            cfnames[i] = columnFamilyStores.get(i).name;
+        }
+        for (Range<Token> range : options.getRanges())
+        {
+            final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+                                                              range,
+                                                              keyspace,
+                                                              options.getParallelism(),
+                                                              rangeToNeighbors.get(range),
+                                                              repairedAt,
+                                                              executor,
+                                                              cfnames);
+            if (session == null)
+                continue;
+            // After repair session completes, notify client its result
+            Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
+            {
+                public void onSuccess(RepairSessionResult result)
+                {
+                    String message = String.format("Repair session %s for range %s finished", session.getId(),
+                                                   session.getRange().toString());
+                    logger.info(message);
+                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+                                                             progress.incrementAndGet(),
+                                                             totalProgress,
+                                                             message));
+                }
+
+                public void onFailure(Throwable t)
+                {
+                    String message = String.format("Repair session %s for range %s failed with error %s",
+                                                   session.getId(), session.getRange().toString(), t.getMessage());
+                    logger.error(message, t);
+                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+                                                             progress.incrementAndGet(),
+                                                             totalProgress,
+                                                             message));
+                }
+            });
+            futures.add(session);
+        }
+
+        // After all repair sessions completes(successful or not),
+        // run anticompaction if necessary and send finish notice back to client
+        final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+        Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
+        {
+            public void onSuccess(List<RepairSessionResult> result)
+            {
+                boolean hasFailure = false;
+                // filter out null(=failed) results and get successful ranges
+                Collection<Range<Token>> successfulRanges = new ArrayList<>();
+                for (RepairSessionResult sessionResult : result)
+                {
+                    if (sessionResult != null)
+                    {
+                        successfulRanges.add(sessionResult.range);
+                    }
+                    else
+                    {
+                        hasFailure = true;
+                    }
+                }
+                try
+                {
+                    ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+                }
+                catch (Exception e)
+                {
+                    logger.error("Error in incremental repair", e);
+                }
+                if (hasFailure)
+                {
+                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+                                                             "Some repair failed"));
+                }
+                else
+                {
+                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+                                                             "Repair completed successfully"));
+                }
+                repairComplete();
+            }
+
+            public void onFailure(Throwable t)
+            {
+                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
+                repairComplete();
+            }
+
+            private void repairComplete()
+            {
+                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+                                                                          true, true);
+                String message = String.format("Repair command #%d finished in %s", cmd, duration);
+                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+                logger.info(message);
+                if (options.isTraced() && traceState != null)
+                {
+                    for (ProgressListener listener : listeners)
+                        traceState.removeProgressListener(listener);
+                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+                    // run in a nondeterministic order (within the same thread), the
+                    // TraceState may have been nulled out at this point. The TraceState
+                    // should be traceState, so just set it without bothering to check if it
+                    // actually was nulled out.
+                    Tracing.instance.set(traceState);
+                    Tracing.traceRepair(message);
+                    Tracing.instance.stopSession();
+                }
+                executor.shutdownNow();
+            }
+        });
+    }
+
+    private Thread createQueryThread(final int cmd, final UUID sessionId)
+    {
+        return new Thread(new WrappedRunnable()
+        {
+            // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+            // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+            public void runMayThrow() throws Exception
+            {
+                TraceState state = Tracing.instance.get(sessionId);
+                if (state == null)
+                    throw new Exception("no tracestate");
+
+                String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+                String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
+                SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+                InetAddress source = FBUtilities.getBroadcastAddress();
+
+                HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+                int si = 0;
+                UUID uuid;
+
+                long tlast = System.currentTimeMillis(), tcur;
+
+                TraceState.Status status;
+                long minWaitMillis = 125;
+                long maxWaitMillis = 1000 * 1024L;
+                long timeout = minWaitMillis;
+                boolean shouldDouble = false;
+
+                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+                {
+                    if (status == TraceState.Status.IDLE)
+                    {
+                        timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+                        shouldDouble = !shouldDouble;
+                    }
+                    else
+                    {
+                        timeout = minWaitMillis;
+                        shouldDouble = false;
+                    }
+                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
+                                                                                                                  tminBytes,
+                                                                                                                  tmaxBytes));
+                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+                    UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+                    for (UntypedResultSet.Row r : result)
+                    {
+                        if (source.equals(r.getInetAddress("source")))
+                            continue;
+                        if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+                            seen[si].add(uuid);
+                        if (seen[si == 0 ? 1 : 0].contains(uuid))
+                            continue;
+                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+                        fireProgressEvent("repair:" + cmd,
+                                          new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+                    }
+                    tlast = tcur;
+
+                    si = si == 0 ? 1 : 0;
+                    seen[si].clear();
+                }
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index faa32c2..7c8e1cc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -57,7 +57,6 @@ import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -83,11 +82,6 @@ public class ActiveRepairService
 
     public static final long UNREPAIRED_SSTABLE = 0;
 
-    public static enum Status
-    {
-        STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING
-    }
-
     /**
      * A map of active coordinator session.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f616710..35c67c4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.management.*;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
@@ -35,7 +34,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,10 +45,6 @@ import org.apache.cassandra.auth.AuthKeyspace;
 import org.apache.cassandra.auth.AuthMigrationListener;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -77,10 +71,8 @@ import org.apache.cassandra.thrift.EndpointDetails;
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
 
@@ -96,8 +88,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
 
-    /* JMX notification serial number counter */
-    private final AtomicLong notificationSerialNumber = new AtomicLong();
+    private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
 
     private static int getRingDelay()
     {
@@ -2419,20 +2410,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    /**
-     * Sends JMX notification to subscribers.
-     *
-     * @param type Message type
-     * @param message Message itself
-     * @param userObject Arbitrary object to attach to notification
-     */
-    public void sendNotification(String type, String message, Object userObject)
-    {
-        Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
-        jmxNotification.setUserData(userObject);
-        sendNotification(jmxNotification);
-    }
-
     public int repairAsync(String keyspace, Map<String, String> repairSpec)
     {
         RepairOption option = RepairOption.parse(repairSpec, getPartitioner());
@@ -2533,7 +2510,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                      boolean fullRepair,
                                      String... columnFamilies)
     {
-        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies);
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
+                                     isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
+                                     dataCenters, hosts, fullRepair, columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken,
@@ -2641,75 +2620,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return cmd;
     }
 
-    private Thread createQueryThread(final int cmd, final UUID sessionId)
-    {
-        return new Thread(new WrappedRunnable()
-        {
-            // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
-            // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
-            public void runMayThrow() throws Exception
-            {
-                TraceState state = Tracing.instance.get(sessionId);
-                if (state == null)
-                    throw new Exception("no tracestate");
-
-                String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
-                String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
-                SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
-
-                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
-                InetAddress source = FBUtilities.getBroadcastAddress();
-
-                HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), new HashSet<UUID>() };
-                int si = 0;
-                UUID uuid;
-
-                long tlast = System.currentTimeMillis(), tcur;
-
-                TraceState.Status status;
-                long minWaitMillis = 125;
-                long maxWaitMillis = 1000 * 1024L;
-                long timeout = minWaitMillis;
-                boolean shouldDouble = false;
-
-                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
-                {
-                    if (status == TraceState.Status.IDLE)
-                    {
-                        timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
-                        shouldDouble = !shouldDouble;
-                    }
-                    else
-                    {
-                        timeout = minWaitMillis;
-                        shouldDouble = false;
-                    }
-                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
-                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
-                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes));
-                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
-                    UntypedResultSet result = UntypedResultSet.create(rows.result);
-
-                    for (UntypedResultSet.Row r : result)
-                    {
-                        if (source.equals(r.getInetAddress("source")))
-                            continue;
-                        if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
-                            seen[si].add(uuid);
-                        if (seen[si == 0 ? 1 : 0].contains(uuid))
-                            continue;
-                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
-                        sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.RUNNING.ordinal()});
-                    }
-                    tlast = tcur;
-
-                    si = si == 0 ? 1 : 0;
-                    seen[si].clear();
-                }
-            }
-        });
-    }
-
     private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options)
     {
         if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
@@ -2717,184 +2627,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             throw new IllegalArgumentException("the local data center must be part of the repair");
         }
 
-        return new FutureTask<>(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-                final TraceState traceState;
-
-                String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
-                Iterable<ColumnFamilyStore> validColumnFamilies = getValidColumnFamilies(false, false, keyspace, columnFamilies);
-
-                final long startTime = System.currentTimeMillis();
-                String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options);
-                logger.info(message);
-                sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
-                if (options.isTraced())
-                {
-                    StringBuilder cfsb = new StringBuilder();
-                    for (ColumnFamilyStore cfs : validColumnFamilies)
-                        cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
-
-                    UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
-                    traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2)));
-                    Tracing.traceRepair(message);
-                    traceState.enableActivityNotification();
-                    traceState.setNotificationHandle(new int[]{ cmd, ActiveRepairService.Status.RUNNING.ordinal() });
-                    Thread queryThread = createQueryThread(cmd, sessionId);
-                    queryThread.setName("RepairTracePolling");
-                    queryThread.start();
-                }
-                else
-                {
-                    traceState = null;
-                }
-
-                final Set<InetAddress> allNeighbors = new HashSet<>();
-                Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
-                for (Range<Token> range : options.getRanges())
-                {
-                    try
-                    {
-                        Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, options.getDataCenters(), options.getHosts());
-                        rangeToNeighbors.put(range, neighbors);
-                        allNeighbors.addAll(neighbors);
-                    }
-                    catch (IllegalArgumentException e)
-                    {
-                        logger.error("Repair failed:", e);
-                        sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                        return;
-                    }
-                }
-
-                // Validate columnfamilies
-                List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
-                try
-                {
-                    Iterables.addAll(columnFamilyStores, validColumnFamilies);
-                }
-                catch (IllegalArgumentException e)
-                {
-                    sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                    return;
-                }
-
-                final UUID parentSession;
-                long repairedAt;
-                try
-                {
-                    parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores);
-                    repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
-                }
-                catch (Throwable t)
-                {
-                    sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                    return;
-                }
-
-                // Set up RepairJob executor for this repair command.
-                final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
-                                                                                                                           Integer.MAX_VALUE,
-                                                                                                                           TimeUnit.SECONDS,
-                                                                                                                           new LinkedBlockingQueue<Runnable>(),
-                                                                                                                           new NamedThreadFactory("Repair#" + cmd),
-                                                                                                                           "internal"));
-
-                List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
-                String[] cfnames = new String[columnFamilyStores.size()];
-                for (int i = 0; i < columnFamilyStores.size(); i++)
-                {
-                    cfnames[i] = columnFamilyStores.get(i).name;
-                }
-                for (Range<Token> range : options.getRanges())
-                {
-                    final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                                      range,
-                                                                      keyspace,
-                                                                      options.getParallelism(),
-                                                                      rangeToNeighbors.get(range),
-                                                                      repairedAt,
-                                                                      executor,
-                                                                      cfnames);
-                    if (session == null)
-                        continue;
-                    // After repair session completes, notify client its result
-                    Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
-                    {
-                        public void onSuccess(RepairSessionResult result)
-                        {
-                            String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString());
-                            logger.info(message);
-                            sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
-                        }
-
-                        public void onFailure(Throwable t)
-                        {
-                            String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRange().toString(), t.getMessage());
-                            logger.error(message, t);
-                            sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
-                        }
-                    });
-                    futures.add(session);
-                }
-
-                // After all repair sessions completes(successful or not),
-                // run anticompaction if necessary and send finish notice back to client
-                final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
-                Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
-                {
-                    public void onSuccess(List<RepairSessionResult> result)
-                    {
-                        // filter out null(=failed) results and get successful ranges
-                        Collection<Range<Token>> successfulRanges = new ArrayList<>();
-                        for (RepairSessionResult sessionResult : result)
-                        {
-                            if (sessionResult != null)
-                            {
-                                successfulRanges.add(sessionResult.range);
-                            }
-                        }
-                        try
-                        {
-                            ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
-                        }
-                        catch (Exception e)
-                        {
-                            logger.error("Error in incremental repair", e);
-                        }
-                        repairComplete();
-                    }
-
-                    public void onFailure(Throwable t)
-                    {
-                        repairComplete();
-                    }
-
-                    private void repairComplete()
-                    {
-                        String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true);
-                        String message = String.format("Repair command #%d finished in %s", cmd, duration);
-                        sendNotification("repair", message,
-                                         new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
-                        logger.info(message);
-                        if (options.isTraced())
-                        {
-                            traceState.setNotificationHandle(null);
-                            // Because DebuggableThreadPoolExecutor#afterExecute and this callback
-                            // run in a nondeterministic order (within the same thread), the
-                            // TraceState may have been nulled out at this point. The TraceState
-                            // should be traceState, so just set it without bothering to check if it
-                            // actually was nulled out.
-                            Tracing.instance.set(traceState);
-                            Tracing.traceRepair(message);
-                            Tracing.instance.stopSession();
-                        }
-                        executor.shutdownNow();
-                    }
-                });
-            }
-        }, null);
+        RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace);
+        task.addProgressListener(progressSupport);
+        return new FutureTask<>(task, null);
     }
 
     public void forceTerminateAllRepairSessions() {
@@ -4213,4 +3948,5 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
         logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 9ee05f2..5012ef5 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -258,8 +258,6 @@ public class NodeProbe implements AutoCloseable
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
             runner.run();
-            if (!runner.get())
-                failed = true;
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/RepairRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java
index 1898bb4..0813775 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -21,16 +21,15 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.text.SimpleDateFormat;
 import java.util.Map;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.remote.JMXConnectionNotification;
+import java.util.concurrent.locks.Condition;
 
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.jmx.JMXNotificationProgressListener;
 
-public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, NotificationListener
+public class RepairRunner extends JMXNotificationProgressListener
 {
     private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
 
@@ -38,9 +37,11 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N
     private final StorageServiceMBean ssProxy;
     private final String keyspace;
     private final Map<String, String> options;
+    private final Condition condition = new SimpleCondition();
 
-    private volatile int cmd;
-    private volatile boolean success;
+    private int cmd;
+    private volatile boolean hasNotificationLost;
+    private volatile Exception error;
 
     public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String keyspace, Map<String, String> options)
     {
@@ -50,52 +51,68 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N
         this.options = options;
     }
 
-    public void run()
+    public void run() throws Exception
     {
         cmd = ssProxy.repairAsync(keyspace, options);
         if (cmd <= 0)
         {
             String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
             out.println(message);
-            set(true);
         }
-    }
-
-    public void handleNotification(Notification notification, Object handback)
-    {
-        if ("repair".equals(notification.getType()))
+        else
         {
-            int[] status = (int[]) notification.getUserData();
-            assert status.length == 2;
-            if (cmd == status[0])
+            condition.await();
+            if (error != null)
+            {
+                throw error;
+            }
+            if (hasNotificationLost)
             {
-                String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
-                out.println(message);
-                // repair status is int array with [0] = cmd number, [1] = status
-                if (status[1] == ActiveRepairService.Status.SESSION_FAILED.ordinal())
-                {
-                    success = false;
-                }
-                else if (status[1] == ActiveRepairService.Status.FINISHED.ordinal())
-                {
-                    set(success);
-                }
+                out.println(String.format("There were some lost notification(s). You should check server log for repair status of keyspace %s", keyspace));
             }
         }
-        else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType()))
+    }
+
+    @Override
+    public boolean isInterestedIn(String tag)
+    {
+        return tag.equals("repair:" + cmd);
+    }
+
+    @Override
+    public void handleNotificationLost(long timestamp, String message)
+    {
+        hasNotificationLost = true;
+    }
+
+    @Override
+    public void handleConnectionClosed(long timestamp, String message)
+    {
+        handleConnectionFailed(timestamp, message);
+    }
+
+    @Override
+    public void handleConnectionFailed(long timestamp, String message)
+    {
+        error = new IOException(String.format("[%s] JMX connection closed. You should check server log for repair status of keyspace %s"
+                                               + "(Subsequent keyspaces are not going to be repaired).",
+                                  format.format(timestamp), keyspace));
+        condition.signalAll();
+    }
+
+    @Override
+    public void progress(String tag, ProgressEvent event)
+    {
+        ProgressEventType type = event.getType();
+        String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage());
+        if (type == ProgressEventType.PROGRESS)
         {
-            String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s",
-                                           format.format(notification.getTimeStamp()),
-                                           keyspace);
-            out.println(message);
+            message = message + " (progress: " + (int)event.getProgressPercentage() + "%)";
         }
-        else if (JMXConnectionNotification.FAILED.equals(notification.getType())
-                 || JMXConnectionNotification.CLOSED.equals(notification.getType()))
+        out.println(message);
+        if (type == ProgressEventType.COMPLETE)
         {
-            String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s"
-                                           + "(Subsequent keyspaces are not going to be repaired).",
-                                           keyspace);
-            setException(new IOException(message));
+            condition.signalAll();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index c67ad3e..758dceb 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.tracing;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,15 +30,17 @@ import org.slf4j.helpers.MessageFormatter;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressListener;
 
 /**
  * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an
  * operation is being traced.
  */
-public class TraceState
+public class TraceState implements ProgressEventNotifier
 {
     public final UUID sessionId;
     public final InetAddress coordinator;
@@ -46,13 +50,14 @@ public class TraceState
     public final int ttl;
 
     private boolean notify;
-    private Object notificationHandle;
+    private List<ProgressListener> listeners = new ArrayList<>();
+    private String tag;
 
     public enum Status
     {
         IDLE,
         ACTIVE,
-        STOPPED;
+        STOPPED
     }
 
     private Status status;
@@ -80,16 +85,30 @@ public class TraceState
         this.status = Status.IDLE;
     }
 
-    public void enableActivityNotification()
+    /**
+     * Activate notification with provided {@code tag} name.
+     *
+     * @param tag Tag name to add when emitting notification
+     */
+    public void enableActivityNotification(String tag)
     {
         assert traceType == Tracing.TraceType.REPAIR;
         notify = true;
+        this.tag = tag;
+    }
+
+    @Override
+    public void addProgressListener(ProgressListener listener)
+    {
+        assert traceType == Tracing.TraceType.REPAIR;
+        listeners.add(listener);
     }
 
-    public void setNotificationHandle(Object handle)
+    @Override
+    public void removeProgressListener(ProgressListener listener)
     {
         assert traceType == Tracing.TraceType.REPAIR;
-        notificationHandle = handle;
+        listeners.remove(listener);
     }
 
     public int elapsed()
@@ -158,16 +177,18 @@ public class TraceState
         if (notify)
             notifyActivity();
 
-        TraceState.trace(sessionIdBytes, message, elapsed(), ttl, notificationHandle);
+        TraceState.mutateWithTracing(sessionIdBytes, message, elapsed(), ttl);
+
+        for (ProgressListener listener : listeners)
+        {
+            listener.progress(tag, ProgressEvent.createNotification(message));
+        }
     }
 
-    public static void trace(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl, final Object notificationHandle)
+    public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl)
     {
         final String threadName = Thread.currentThread().getName();
 
-        if (notificationHandle != null)
-            StorageService.instance.sendNotification("repair", message, notificationHandle);
-
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
         {
             public void runMayThrow()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
new file mode 100644
index 0000000..31d3120
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Progress event
+ */
+public class ProgressEvent
+{
+    private final ProgressEventType type;
+    private final int progressCount;
+    private final int total;
+    private final String message;
+
+    public static ProgressEvent createNotification(String message)
+    {
+        return new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message);
+    }
+
+    public ProgressEvent(ProgressEventType type, int progressCount, int total)
+    {
+        this(type, progressCount, total, null);
+    }
+
+    public ProgressEvent(ProgressEventType type, int progressCount, int total, String message)
+    {
+        this.type = type;
+        this.progressCount = progressCount;
+        this.total = total;
+        this.message = message;
+    }
+
+    public ProgressEventType getType()
+    {
+        return type;
+    }
+
+    public int getProgressCount()
+    {
+        return progressCount;
+    }
+
+    public int getTotal()
+    {
+        return total;
+    }
+
+    public double getProgressPercentage()
+    {
+        return total != 0 ? progressCount * 100 / (double) total : 0;
+    }
+
+    /**
+     * @return Message attached to this event. Can be null.
+     */
+    public String getMessage()
+    {
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
new file mode 100644
index 0000000..07a6618
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Interface for {@link ProgressEvent} publisher.
+ */
+public interface ProgressEventNotifier
+{
+    /**
+     * Register progress listener to this publisher.
+     *
+     * @param listener listener to register.
+     */
+    void addProgressListener(ProgressListener listener);
+
+    /**
+     * Remove progress listener from this publisher.
+     *
+     * @param listener listener to remove
+     */
+    void removeProgressListener(ProgressListener listener);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
new file mode 100644
index 0000000..8d7daee
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Progress event type.
+ *
+ * <p>
+ * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events,
+ * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}.
+ * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process.
+ * </p>
+ * <p>
+ * {@link #NOTIFICATION} event type is used to just notify message without progress.
+ * </p>
+ */
+public enum ProgressEventType
+{
+    /**
+     * Fired first when progress starts.
+     * Happens only once.
+     */
+    START,
+
+    /**
+     * Fire when progress happens.
+     * This can be zero or more time after START.
+     */
+    PROGRESS,
+
+    /**
+     * When observing process completes with error, this is sent once before COMPLETE.
+     */
+    ERROR,
+
+    /**
+     * When observing process is aborted by user, this is sent once before COMPLETE.
+     */
+    ABORT,
+
+    /**
+     * When observing process completes successfully, this is sent once before COMPLETE.
+     */
+    SUCCESS,
+
+    /**
+     * Fire when progress complete.
+     * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+     * After this, no more ProgressEvent should be fired for the same event.
+     */
+    COMPLETE,
+
+    /**
+     * Used when sending message without progress.
+     */
+    NOTIFICATION
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressListener.java b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
new file mode 100644
index 0000000..48342a8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Listener interface to handle {@link org.apache.cassandra.utils.progress.ProgressEvent}
+ */
+public interface ProgressListener
+{
+    /**
+     * Called when some progress is made by progress publisher.
+     *
+     * @param tag String that identifies progress event.
+     * @param event Current progress
+     */
+    void progress(String tag, ProgressEvent event);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
new file mode 100644
index 0000000..3461487
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.Map;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.remote.JMXConnectionNotification;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+/**
+ * JMXNotificationProgressListener uses JMX Notification API to convert JMX Notification message to progress event
+ * and notifies its {@link ProgressListener}s.
+ *
+ * This is to be implemented in client tools side.
+ */
+public abstract class JMXNotificationProgressListener implements ProgressListener, NotificationListener
+{
+    /**
+     * @param tag tag name to be checked
+     * @return true if given tag for ProgressEvent is a target to consume. If this returns false, then
+     *         {@link #progress} is not called for that event.
+     */
+    public abstract boolean isInterestedIn(String tag);
+
+    /**
+     * Called when receiving {@link JMXConnectionNotification#NOTIFS_LOST} message.
+     */
+    public void handleNotificationLost(long timestamp, String message) {}
+
+    /**
+     * Called when JMX connection is closed.
+     * Specifically when {@link JMXConnectionNotification#CLOSED} message is received.
+     */
+    public void handleConnectionClosed(long timestamp, String message) {}
+
+    /**
+     * Called when JMX connection is failed.
+     * Specifically when {@link JMXConnectionNotification#FAILED} message is received.
+     */
+    public void handleConnectionFailed(long timestamp, String message) {}
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handleNotification(Notification notification, Object handback)
+    {
+        switch (notification.getType())
+        {
+            case "progress":
+                String tag = (String) notification.getSource();
+                if (this.isInterestedIn(tag))
+                {
+                    Map<String, Integer> progress = (Map<String, Integer>) notification.getUserData();
+                    String message = notification.getMessage();
+                    ProgressEvent event = new ProgressEvent(ProgressEventType.values()[progress.get("type")],
+                                                            progress.get("progressCount"),
+                                                            progress.get("total"),
+                                                            message);
+                    this.progress(tag, event);
+                }
+                break;
+
+            case JMXConnectionNotification.NOTIFS_LOST:
+                handleNotificationLost(notification.getTimeStamp(), notification.getMessage());
+                break;
+
+            case JMXConnectionNotification.FAILED:
+                handleConnectionFailed(notification.getTimeStamp(), notification.getMessage());
+                break;
+
+            case JMXConnectionNotification.CLOSED:
+                handleConnectionClosed(notification.getTimeStamp(), notification.getMessage());
+                break;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
new file mode 100644
index 0000000..12efd0d
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+/**
+ * ProgressListener that translates ProgressEvent to JMX Notification message.
+ */
+public class JMXProgressSupport implements ProgressListener
+{
+    private final AtomicLong notificationSerialNumber = new AtomicLong();
+
+    private final NotificationBroadcasterSupport broadcaster;
+
+    public JMXProgressSupport(NotificationBroadcasterSupport broadcaster)
+    {
+        this.broadcaster = broadcaster;
+    }
+
+    @Override
+    public void progress(String tag, ProgressEvent event)
+    {
+        Notification notification = new Notification("progress",
+                                                     tag,
+                                                     notificationSerialNumber.getAndIncrement(),
+                                                     System.currentTimeMillis(),
+                                                     event.getMessage());
+        Map<String, Integer> userData = new HashMap<>();
+        userData.put("type", event.getType().ordinal());
+        userData.put("progressCount", event.getProgressCount());
+        userData.put("total", event.getTotal());
+        notification.setUserData(userData);
+        broadcaster.sendNotification(notification);
+    }
+}