You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/03/10 20:18:46 UTC
cassandra git commit: Generalize progress reporting
Repository: cassandra
Updated Branches:
refs/heads/trunk d76adf500 -> 4adb98146
Generalize progress reporting
patch by yukim; reviewed by Josh McKenzie for CASSANDRA-8901
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4adb9814
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4adb9814
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4adb9814
Branch: refs/heads/trunk
Commit: 4adb9814639d6e4225105e0c364044fac8b4b84d
Parents: d76adf5
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Mar 10 14:17:31 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Mar 10 14:17:31 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/net/OutboundTcpConnection.java | 2 +-
.../apache/cassandra/repair/RepairRunnable.java | 394 +++++++++++++++++++
.../cassandra/service/ActiveRepairService.java | 6 -
.../cassandra/service/StorageService.java | 282 +------------
.../org/apache/cassandra/tools/NodeProbe.java | 2 -
.../apache/cassandra/tools/RepairRunner.java | 99 +++--
.../apache/cassandra/tracing/TraceState.java | 45 ++-
.../cassandra/utils/progress/ProgressEvent.java | 75 ++++
.../utils/progress/ProgressEventNotifier.java | 38 ++
.../utils/progress/ProgressEventType.java | 72 ++++
.../utils/progress/ProgressListener.java | 32 ++
.../jmx/JMXNotificationProgressListener.java | 94 +++++
.../utils/progress/jmx/JMXProgressSupport.java | 58 +++
14 files changed, 865 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 29e1edf..2de6137 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,7 @@
* Improve concurrency of repair (CASSANDRA-6455, 8208)
* Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
* Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
+ * Generalize progress reporting (CASSANDRA-8901)
2.1.4
* cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 5130974..6db83b4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -217,7 +217,7 @@ public class OutboundTcpConnection extends Thread
{
byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE);
Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
- TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL(), null);
+ TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
new file mode 100644
index 0000000..89c0d70
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.*;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
+{
+ private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
+
+ private StorageService storageService;
+ private final int cmd;
+ private final RepairOption options;
+ private final String keyspace;
+
+ private final List<ProgressListener> listeners = new ArrayList<>();
+
+ public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
+ {
+ this.storageService = storageService;
+ this.cmd = cmd;
+ this.options = options;
+ this.keyspace = keyspace;
+ }
+
+ @Override
+ public void addProgressListener(ProgressListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeProgressListener(ProgressListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ protected void fireProgressEvent(String tag, ProgressEvent event)
+ {
+ for (ProgressListener listener : listeners)
+ {
+ listener.progress(tag, event);
+ }
+ }
+
+ protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ final TraceState traceState;
+
+ final String tag = "repair:" + cmd;
+
+ final AtomicInteger progress = new AtomicInteger();
+ final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
+
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
+ columnFamilies);
+
+ final long startTime = System.currentTimeMillis();
+ String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
+ options);
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
+ if (options.isTraced())
+ {
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : validColumnFamilies)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
+ cfsb.substring(2)));
+ Tracing.traceRepair(message);
+ traceState.enableActivityNotification(tag);
+ for (ProgressListener listener : listeners)
+ traceState.addProgressListener(listener);
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ }
+ else
+ {
+ traceState = null;
+ }
+
+ final Set<InetAddress> allNeighbors = new HashSet<>();
+ Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+ try
+ {
+ for (Range<Token> range : options.getRanges())
+ {
+ Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
+ options.getDataCenters(),
+ options.getHosts());
+ rangeToNeighbors.put(range, neighbors);
+ allNeighbors.addAll(neighbors);
+ }
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.error("Repair failed:", e);
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ // Validate columnfamilies
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+ try
+ {
+ Iterables.addAll(columnFamilyStores, validColumnFamilies);
+ progress.incrementAndGet();
+ }
+ catch (IllegalArgumentException e)
+ {
+ fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
+ return;
+ }
+
+ final UUID parentSession;
+ long repairedAt;
+ try
+ {
+ parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores);
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
+ progress.incrementAndGet();
+ }
+ catch (Throwable t)
+ {
+ fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
+ return;
+ }
+
+ // Set up RepairJob executor for this repair command.
+ final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("Repair#" + cmd),
+ "internal"));
+
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+ String[] cfnames = new String[columnFamilyStores.size()];
+ for (int i = 0; i < columnFamilyStores.size(); i++)
+ {
+ cfnames[i] = columnFamilyStores.get(i).name;
+ }
+ for (Range<Token> range : options.getRanges())
+ {
+ final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ range,
+ keyspace,
+ options.getParallelism(),
+ rangeToNeighbors.get(range),
+ repairedAt,
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ // After repair session completes, notify client its result
+ Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
+ {
+ public void onSuccess(RepairSessionResult result)
+ {
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.getRange().toString());
+ logger.info(message);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ public void onFailure(Throwable t)
+ {
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.getRange().toString(), t.getMessage());
+ logger.error(message, t);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
+ progress.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+ });
+ futures.add(session);
+ }
+
+ // After all repair sessions completes(successful or not),
+ // run anticompaction if necessary and send finish notice back to client
+ final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+ Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
+ {
+ public void onSuccess(List<RepairSessionResult> result)
+ {
+ boolean hasFailure = false;
+ // filter out null(=failed) results and get successful ranges
+ Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ for (RepairSessionResult sessionResult : result)
+ {
+ if (sessionResult != null)
+ {
+ successfulRanges.add(sessionResult.range);
+ }
+ else
+ {
+ hasFailure = true;
+ }
+ }
+ try
+ {
+ ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error in incremental repair", e);
+ }
+ if (hasFailure)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
+ "Some repair failed"));
+ }
+ else
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
+ "Repair completed successfully"));
+ }
+ repairComplete();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
+ repairComplete();
+ }
+
+ private void repairComplete()
+ {
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+ true, true);
+ String message = String.format("Repair command #%d finished in %s", cmd, duration);
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
+ logger.info(message);
+ if (options.isTraced() && traceState != null)
+ {
+ for (ProgressListener listener : listeners)
+ traceState.removeProgressListener(listener);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(traceState);
+ Tracing.traceRepair(message);
+ Tracing.instance.stopSession();
+ }
+ executor.shutdownNow();
+ }
+ });
+ }
+
+ private Thread createQueryThread(final int cmd, final UUID sessionId)
+ {
+ return new Thread(new WrappedRunnable()
+ {
+ // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+ // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+ public void runMayThrow() throws Exception
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ if (state == null)
+ throw new Exception("no tracestate");
+
+ String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+ String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
+ SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+ ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ InetAddress source = FBUtilities.getBroadcastAddress();
+
+ HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
+ int si = 0;
+ UUID uuid;
+
+ long tlast = System.currentTimeMillis(), tcur;
+
+ TraceState.Status status;
+ long minWaitMillis = 125;
+ long maxWaitMillis = 1000 * 1024L;
+ long timeout = minWaitMillis;
+ boolean shouldDouble = false;
+
+ while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+ {
+ if (status == TraceState.Status.IDLE)
+ {
+ timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+ shouldDouble = !shouldDouble;
+ }
+ else
+ {
+ timeout = minWaitMillis;
+ shouldDouble = false;
+ }
+ ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+ ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
+ tminBytes,
+ tmaxBytes));
+ ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+ UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+ for (UntypedResultSet.Row r : result)
+ {
+ if (source.equals(r.getInetAddress("source")))
+ continue;
+ if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+ seen[si].add(uuid);
+ if (seen[si == 0 ? 1 : 0].contains(uuid))
+ continue;
+ String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+ fireProgressEvent("repair:" + cmd,
+ new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
+ }
+ tlast = tcur;
+
+ si = si == 0 ? 1 : 0;
+ seen[si].clear();
+ }
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index faa32c2..7c8e1cc 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -57,7 +57,6 @@ import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -83,11 +82,6 @@ public class ActiveRepairService
public static final long UNREPAIRED_SSTABLE = 0;
- public static enum Status
- {
- STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING
- }
-
/**
* A map of active coordinator session.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f616710..35c67c4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import javax.management.*;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
@@ -35,7 +34,6 @@ import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,10 +45,6 @@ import org.apache.cassandra.auth.AuthKeyspace;
import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -77,10 +71,8 @@ import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.cassandraConstants;
import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
@@ -96,8 +88,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
- /* JMX notification serial number counter */
- private final AtomicLong notificationSerialNumber = new AtomicLong();
+ private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
private static int getRingDelay()
{
@@ -2419,20 +2410,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- /**
- * Sends JMX notification to subscribers.
- *
- * @param type Message type
- * @param message Message itself
- * @param userObject Arbitrary object to attach to notification
- */
- public void sendNotification(String type, String message, Object userObject)
- {
- Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
- jmxNotification.setUserData(userObject);
- sendNotification(jmxNotification);
- }
-
public int repairAsync(String keyspace, Map<String, String> repairSpec)
{
RepairOption option = RepairOption.parse(repairSpec, getPartitioner());
@@ -2533,7 +2510,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
boolean fullRepair,
String... columnFamilies)
{
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies);
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
+ isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
+ dataCenters, hosts, fullRepair, columnFamilies);
}
public int forceRepairRangeAsync(String beginToken,
@@ -2641,75 +2620,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
- private Thread createQueryThread(final int cmd, final UUID sessionId)
- {
- return new Thread(new WrappedRunnable()
- {
- // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
- // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
- public void runMayThrow() throws Exception
- {
- TraceState state = Tracing.instance.get(sessionId);
- if (state == null)
- throw new Exception("no tracestate");
-
- String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
- String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
- SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
-
- ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
- InetAddress source = FBUtilities.getBroadcastAddress();
-
- HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), new HashSet<UUID>() };
- int si = 0;
- UUID uuid;
-
- long tlast = System.currentTimeMillis(), tcur;
-
- TraceState.Status status;
- long minWaitMillis = 125;
- long maxWaitMillis = 1000 * 1024L;
- long timeout = minWaitMillis;
- boolean shouldDouble = false;
-
- while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
- {
- if (status == TraceState.Status.IDLE)
- {
- timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
- shouldDouble = !shouldDouble;
- }
- else
- {
- timeout = minWaitMillis;
- shouldDouble = false;
- }
- ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
- ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
- QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes));
- ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
- UntypedResultSet result = UntypedResultSet.create(rows.result);
-
- for (UntypedResultSet.Row r : result)
- {
- if (source.equals(r.getInetAddress("source")))
- continue;
- if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
- seen[si].add(uuid);
- if (seen[si == 0 ? 1 : 0].contains(uuid))
- continue;
- String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.RUNNING.ordinal()});
- }
- tlast = tcur;
-
- si = si == 0 ? 1 : 0;
- seen[si].clear();
- }
- }
- });
- }
-
private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options)
{
if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
@@ -2717,184 +2627,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new IllegalArgumentException("the local data center must be part of the repair");
}
- return new FutureTask<>(new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- final TraceState traceState;
-
- String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
- Iterable<ColumnFamilyStore> validColumnFamilies = getValidColumnFamilies(false, false, keyspace, columnFamilies);
-
- final long startTime = System.currentTimeMillis();
- String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options);
- logger.info(message);
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
- if (options.isTraced())
- {
- StringBuilder cfsb = new StringBuilder();
- for (ColumnFamilyStore cfs : validColumnFamilies)
- cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
-
- UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
- traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2)));
- Tracing.traceRepair(message);
- traceState.enableActivityNotification();
- traceState.setNotificationHandle(new int[]{ cmd, ActiveRepairService.Status.RUNNING.ordinal() });
- Thread queryThread = createQueryThread(cmd, sessionId);
- queryThread.setName("RepairTracePolling");
- queryThread.start();
- }
- else
- {
- traceState = null;
- }
-
- final Set<InetAddress> allNeighbors = new HashSet<>();
- Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
- for (Range<Token> range : options.getRanges())
- {
- try
- {
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, options.getDataCenters(), options.getHosts());
- rangeToNeighbors.put(range, neighbors);
- allNeighbors.addAll(neighbors);
- }
- catch (IllegalArgumentException e)
- {
- logger.error("Repair failed:", e);
- sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- return;
- }
- }
-
- // Validate columnfamilies
- List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
- try
- {
- Iterables.addAll(columnFamilyStores, validColumnFamilies);
- }
- catch (IllegalArgumentException e)
- {
- sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- return;
- }
-
- final UUID parentSession;
- long repairedAt;
- try
- {
- parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options, columnFamilyStores);
- repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
- }
- catch (Throwable t)
- {
- sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- return;
- }
-
- // Set up RepairJob executor for this repair command.
- final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("Repair#" + cmd),
- "internal"));
-
- List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
- String[] cfnames = new String[columnFamilyStores.size()];
- for (int i = 0; i < columnFamilyStores.size(); i++)
- {
- cfnames[i] = columnFamilyStores.get(i).name;
- }
- for (Range<Token> range : options.getRanges())
- {
- final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
- range,
- keyspace,
- options.getParallelism(),
- rangeToNeighbors.get(range),
- repairedAt,
- executor,
- cfnames);
- if (session == null)
- continue;
- // After repair session completes, notify client its result
- Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
- {
- public void onSuccess(RepairSessionResult result)
- {
- String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString());
- logger.info(message);
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
- }
-
- public void onFailure(Throwable t)
- {
- String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRange().toString(), t.getMessage());
- logger.error(message, t);
- sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
- }
- });
- futures.add(session);
- }
-
- // After all repair sessions completes(successful or not),
- // run anticompaction if necessary and send finish notice back to client
- final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
- Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
- {
- public void onSuccess(List<RepairSessionResult> result)
- {
- // filter out null(=failed) results and get successful ranges
- Collection<Range<Token>> successfulRanges = new ArrayList<>();
- for (RepairSessionResult sessionResult : result)
- {
- if (sessionResult != null)
- {
- successfulRanges.add(sessionResult.range);
- }
- }
- try
- {
- ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
- }
- catch (Exception e)
- {
- logger.error("Error in incremental repair", e);
- }
- repairComplete();
- }
-
- public void onFailure(Throwable t)
- {
- repairComplete();
- }
-
- private void repairComplete()
- {
- String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true);
- String message = String.format("Repair command #%d finished in %s", cmd, duration);
- sendNotification("repair", message,
- new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
- logger.info(message);
- if (options.isTraced())
- {
- traceState.setNotificationHandle(null);
- // Because DebuggableThreadPoolExecutor#afterExecute and this callback
- // run in a nondeterministic order (within the same thread), the
- // TraceState may have been nulled out at this point. The TraceState
- // should be traceState, so just set it without bothering to check if it
- // actually was nulled out.
- Tracing.instance.set(traceState);
- Tracing.traceRepair(message);
- Tracing.instance.stopSession();
- }
- executor.shutdownNow();
- }
- });
- }
- }, null);
+ RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace);
+ task.addProgressListener(progressSupport);
+ return new FutureTask<>(task, null);
}
public void forceTerminateAllRepairSessions() {
@@ -4213,4 +3948,5 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 9ee05f2..5012ef5 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -258,8 +258,6 @@ public class NodeProbe implements AutoCloseable
jmxc.addConnectionNotificationListener(runner, null, null);
ssProxy.addNotificationListener(runner, null, null);
runner.run();
- if (!runner.get())
- failed = true;
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tools/RepairRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java
index 1898bb4..0813775 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -21,16 +21,15 @@ import java.io.IOException;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.Map;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.remote.JMXConnectionNotification;
+import java.util.concurrent.locks.Condition;
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.jmx.JMXNotificationProgressListener;
-public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, NotificationListener
+public class RepairRunner extends JMXNotificationProgressListener
{
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
@@ -38,9 +37,11 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N
private final StorageServiceMBean ssProxy;
private final String keyspace;
private final Map<String, String> options;
+ private final Condition condition = new SimpleCondition();
- private volatile int cmd;
- private volatile boolean success;
+ private int cmd;
+ private volatile boolean hasNotificationLost;
+ private volatile Exception error;
public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String keyspace, Map<String, String> options)
{
@@ -50,52 +51,68 @@ public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, N
this.options = options;
}
- public void run()
+ public void run() throws Exception
{
cmd = ssProxy.repairAsync(keyspace, options);
if (cmd <= 0)
{
String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
out.println(message);
- set(true);
}
- }
-
- public void handleNotification(Notification notification, Object handback)
- {
- if ("repair".equals(notification.getType()))
+ else
{
- int[] status = (int[]) notification.getUserData();
- assert status.length == 2;
- if (cmd == status[0])
+ condition.await();
+ if (error != null)
+ {
+ throw error;
+ }
+ if (hasNotificationLost)
{
- String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
- out.println(message);
- // repair status is int array with [0] = cmd number, [1] = status
- if (status[1] == ActiveRepairService.Status.SESSION_FAILED.ordinal())
- {
- success = false;
- }
- else if (status[1] == ActiveRepairService.Status.FINISHED.ordinal())
- {
- set(success);
- }
+ out.println(String.format("There were some lost notification(s). You should check server log for repair status of keyspace %s", keyspace));
}
}
- else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType()))
+ }
+
+ @Override
+ public boolean isInterestedIn(String tag)
+ {
+ return tag.equals("repair:" + cmd);
+ }
+
+ @Override
+ public void handleNotificationLost(long timestamp, String message)
+ {
+ hasNotificationLost = true;
+ }
+
+ @Override
+ public void handleConnectionClosed(long timestamp, String message)
+ {
+ handleConnectionFailed(timestamp, message);
+ }
+
+ @Override
+ public void handleConnectionFailed(long timestamp, String message)
+ {
+ error = new IOException(String.format("[%s] JMX connection closed. You should check server log for repair status of keyspace %s"
+ + "(Subsequent keyspaces are not going to be repaired).",
+ format.format(timestamp), keyspace));
+ condition.signalAll();
+ }
+
+ @Override
+ public void progress(String tag, ProgressEvent event)
+ {
+ ProgressEventType type = event.getType();
+ String message = String.format("[%s] %s", format.format(System.currentTimeMillis()), event.getMessage());
+ if (type == ProgressEventType.PROGRESS)
{
- String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s",
- format.format(notification.getTimeStamp()),
- keyspace);
- out.println(message);
+ message = message + " (progress: " + (int)event.getProgressPercentage() + "%)";
}
- else if (JMXConnectionNotification.FAILED.equals(notification.getType())
- || JMXConnectionNotification.CLOSED.equals(notification.getType()))
+ out.println(message);
+ if (type == ProgressEventType.COMPLETE)
{
- String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s"
- + "(Subsequent keyspaces are not going to be repaired).",
- keyspace);
- setException(new IOException(message));
+ condition.signalAll();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index c67ad3e..758dceb 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.tracing;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,15 +30,17 @@ import org.slf4j.helpers.MessageFormatter;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventNotifier;
+import org.apache.cassandra.utils.progress.ProgressListener;
/**
* ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an
* operation is being traced.
*/
-public class TraceState
+public class TraceState implements ProgressEventNotifier
{
public final UUID sessionId;
public final InetAddress coordinator;
@@ -46,13 +50,14 @@ public class TraceState
public final int ttl;
private boolean notify;
- private Object notificationHandle;
+ private List<ProgressListener> listeners = new ArrayList<>();
+ private String tag;
public enum Status
{
IDLE,
ACTIVE,
- STOPPED;
+ STOPPED
}
private Status status;
@@ -80,16 +85,30 @@ public class TraceState
this.status = Status.IDLE;
}
- public void enableActivityNotification()
+ /**
+ * Activate notification with provided {@code tag} name.
+ *
+ * @param tag Tag name to add when emitting notification
+ */
+ public void enableActivityNotification(String tag)
{
assert traceType == Tracing.TraceType.REPAIR;
notify = true;
+ this.tag = tag;
+ }
+
+ @Override
+ public void addProgressListener(ProgressListener listener)
+ {
+ assert traceType == Tracing.TraceType.REPAIR;
+ listeners.add(listener);
}
- public void setNotificationHandle(Object handle)
+ @Override
+ public void removeProgressListener(ProgressListener listener)
{
assert traceType == Tracing.TraceType.REPAIR;
- notificationHandle = handle;
+ listeners.remove(listener);
}
public int elapsed()
@@ -158,16 +177,18 @@ public class TraceState
if (notify)
notifyActivity();
- TraceState.trace(sessionIdBytes, message, elapsed(), ttl, notificationHandle);
+ TraceState.mutateWithTracing(sessionIdBytes, message, elapsed(), ttl);
+
+ for (ProgressListener listener : listeners)
+ {
+ listener.progress(tag, ProgressEvent.createNotification(message));
+ }
}
- public static void trace(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl, final Object notificationHandle)
+ public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl)
{
final String threadName = Thread.currentThread().getName();
- if (notificationHandle != null)
- StorageService.instance.sendNotification("repair", message, notificationHandle);
-
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
new file mode 100644
index 0000000..31d3120
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Progress event
+ */
+public class ProgressEvent
+{
+ private final ProgressEventType type;
+ private final int progressCount;
+ private final int total;
+ private final String message;
+
+ public static ProgressEvent createNotification(String message)
+ {
+ return new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message);
+ }
+
+ public ProgressEvent(ProgressEventType type, int progressCount, int total)
+ {
+ this(type, progressCount, total, null);
+ }
+
+ public ProgressEvent(ProgressEventType type, int progressCount, int total, String message)
+ {
+ this.type = type;
+ this.progressCount = progressCount;
+ this.total = total;
+ this.message = message;
+ }
+
+ public ProgressEventType getType()
+ {
+ return type;
+ }
+
+ public int getProgressCount()
+ {
+ return progressCount;
+ }
+
+ public int getTotal()
+ {
+ return total;
+ }
+
+ public double getProgressPercentage()
+ {
+ return total != 0 ? progressCount * 100 / (double) total : 0;
+ }
+
+ /**
+ * @return Message attached to this event. Can be null.
+ */
+ public String getMessage()
+ {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
new file mode 100644
index 0000000..07a6618
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventNotifier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Interface for {@link ProgressEvent} publisher.
+ */
+public interface ProgressEventNotifier
+{
+ /**
+ * Register progress listener to this publisher.
+ *
+ * @param listener listener to register.
+ */
+ void addProgressListener(ProgressListener listener);
+
+ /**
+ * Remove progress listener from this publisher.
+ *
+ * @param listener listener to remove
+ */
+ void removeProgressListener(ProgressListener listener);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
new file mode 100644
index 0000000..8d7daee
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressEventType.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Progress event type.
+ *
+ * <p>
+ * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events,
+ * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}.
+ * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process.
+ * </p>
+ * <p>
+ * {@link #NOTIFICATION} event type is used to just notify message without progress.
+ * </p>
+ */
+public enum ProgressEventType
+{
+ /**
+ * Fired first when progress starts.
+ * Happens only once.
+ */
+ START,
+
+ /**
+ * Fire when progress happens.
+ * This can be zero or more time after START.
+ */
+ PROGRESS,
+
+ /**
+ * When observing process completes with error, this is sent once before COMPLETE.
+ */
+ ERROR,
+
+ /**
+ * When observing process is aborted by user, this is sent once before COMPLETE.
+ */
+ ABORT,
+
+ /**
+ * When observing process completes successfully, this is sent once before COMPLETE.
+ */
+ SUCCESS,
+
+ /**
+ * Fire when progress complete.
+ * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+ * After this, no more ProgressEvent should be fired for the same event.
+ */
+ COMPLETE,
+
+ /**
+ * Used when sending message without progress.
+ */
+ NOTIFICATION
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/ProgressListener.java b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
new file mode 100644
index 0000000..48342a8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/ProgressListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress;
+
+/**
+ * Listener interface to handle {@link org.apache.cassandra.utils.progress.ProgressEvent}
+ */
+public interface ProgressListener
+{
+ /**
+ * Called when some progress is made by progress publisher.
+ *
+ * @param tag String that identifies progress event.
+ * @param event Current progress
+ */
+ void progress(String tag, ProgressEvent event);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
new file mode 100644
index 0000000..3461487
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXNotificationProgressListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.Map;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.remote.JMXConnectionNotification;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+/**
+ * JMXNotificationProgressListener uses JMX Notification API to convert JMX Notification message to progress event
+ * and notifies its {@link ProgressListener}s.
+ *
+ * This is to be implemented in client tools side.
+ */
+public abstract class JMXNotificationProgressListener implements ProgressListener, NotificationListener
+{
+ /**
+ * @param tag tag name to be checked
+ * @return true if given tag for ProgressEvent is a target to consume. If this returns false, then
+ * {@link #progress} is not called for that event.
+ */
+ public abstract boolean isInterestedIn(String tag);
+
+ /**
+ * Called when receiving {@link JMXConnectionNotification#NOTIFS_LOST} message.
+ */
+ public void handleNotificationLost(long timestamp, String message) {}
+
+ /**
+ * Called when JMX connection is closed.
+ * Specifically when {@link JMXConnectionNotification#CLOSED} message is received.
+ */
+ public void handleConnectionClosed(long timestamp, String message) {}
+
+ /**
+ * Called when JMX connection is failed.
+ * Specifically when {@link JMXConnectionNotification#FAILED} message is received.
+ */
+ public void handleConnectionFailed(long timestamp, String message) {}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handleNotification(Notification notification, Object handback)
+ {
+ switch (notification.getType())
+ {
+ case "progress":
+ String tag = (String) notification.getSource();
+ if (this.isInterestedIn(tag))
+ {
+ Map<String, Integer> progress = (Map<String, Integer>) notification.getUserData();
+ String message = notification.getMessage();
+ ProgressEvent event = new ProgressEvent(ProgressEventType.values()[progress.get("type")],
+ progress.get("progressCount"),
+ progress.get("total"),
+ message);
+ this.progress(tag, event);
+ }
+ break;
+
+ case JMXConnectionNotification.NOTIFS_LOST:
+ handleNotificationLost(notification.getTimeStamp(), notification.getMessage());
+ break;
+
+ case JMXConnectionNotification.FAILED:
+ handleConnectionFailed(notification.getTimeStamp(), notification.getMessage());
+ break;
+
+ case JMXConnectionNotification.CLOSED:
+ handleConnectionClosed(notification.getTimeStamp(), notification.getMessage());
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4adb9814/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
new file mode 100644
index 0000000..12efd0d
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXProgressSupport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.progress.jmx;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+/**
+ * ProgressListener that translates ProgressEvent to JMX Notification message.
+ */
+public class JMXProgressSupport implements ProgressListener
+{
+ private final AtomicLong notificationSerialNumber = new AtomicLong();
+
+ private final NotificationBroadcasterSupport broadcaster;
+
+ public JMXProgressSupport(NotificationBroadcasterSupport broadcaster)
+ {
+ this.broadcaster = broadcaster;
+ }
+
+ @Override
+ public void progress(String tag, ProgressEvent event)
+ {
+ Notification notification = new Notification("progress",
+ tag,
+ notificationSerialNumber.getAndIncrement(),
+ System.currentTimeMillis(),
+ event.getMessage());
+ Map<String, Integer> userData = new HashMap<>();
+ userData.put("type", event.getType().ordinal());
+ userData.put("progressCount", event.getProgressCount());
+ userData.put("total", event.getTotal());
+ notification.setUserData(userData);
+ broadcaster.sendNotification(notification);
+ }
+}