You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC
[3/8] git commit: Show progress on nodetool repair command;
patch by yukim reviewed Sylvain Lebresne for CASSANDRA-4767
Show progress on nodetool repair command; patch by yukim reviewed Sylvain Lebresne for CASSANDRA-4767
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0906b7cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0906b7cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0906b7cc
Branch: refs/heads/cassandra-1.2
Commit: 0906b7cc5173770e04932432d40503f7c39eb61f
Parents: 55f936f
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 8 11:20:25 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 8 11:20:25 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/AntiEntropyService.java | 10 ++
.../apache/cassandra/service/StorageService.java | 89 ++++++++++++++-
.../cassandra/service/StorageServiceMBean.java | 16 +++-
src/java/org/apache/cassandra/tools/NodeCmd.java | 10 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 74 ++++++++++++
6 files changed, 191 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c5c3863..5e87435 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* fix specifying and altering crc_check_chance (CASSANDRA-5053)
* Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
* Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
+ * nodetool repair command now prints progress (CASSANDRA-4767)
1.1.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 0d7c1b4..dc03122 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -104,6 +104,11 @@ public class AntiEntropyService
"internal");
}
+ public static enum Status
+ {
+ STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+ }
+
/**
* A map of active session.
*/
@@ -646,6 +651,11 @@ public class AntiEntropyService
return sessionName;
}
+ public Range<Token> getRange()
+ {
+ return range;
+ }
+
RepairFuture getFuture()
{
return new RepairFuture(this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a948786..ad05ce2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,7 +29,10 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
import com.google.common.base.Supplier;
@@ -78,12 +81,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
* This class will also maintain histograms of the load information
* of other nodes in the cluster.
*/
-public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean
+public class StorageService extends NotificationBroadcasterSupport implements IEndpointStateChangeSubscriber, StorageServiceMBean
{
private static Logger logger_ = LoggerFactory.getLogger(StorageService.class);
public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+ /* JMX notification serial number counter */
+ private final AtomicLong notificationSerialNumber = new AtomicLong();
+
/* All verb handler identifiers */
public enum Verb
{
@@ -244,6 +250,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private static final AtomicInteger nextRepairCommand = new AtomicInteger();
+ private final ObjectName jmxObjectName;
+
public void finishBootstrapping()
{
isBootstrapMode = false;
@@ -265,7 +273,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=StorageService"));
+ jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
+ mbs.registerMBean(this, jmxObjectName);
}
catch (Exception e)
{
@@ -1935,6 +1944,82 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
/**
+ * Sends JMX notification to subscribers.
+ *
+ * @param type Message type
+ * @param message Message itself
+ * @param userObject Arbitrary object to attach to notification
+ */
+ public void sendNotification(String type, String message, Object userObject)
+ {
+ Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
+ jmxNotification.setUserData(userObject);
+ sendNotification(jmxNotification);
+ }
+
+ public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies)
+ {
+ if (Table.SYSTEM_TABLE.equals(tableName))
+ return 0;
+
+ final int cmd = nextRepairCommand.incrementAndGet();
+ final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
+ if (ranges.size() > 0)
+ {
+ new Thread(new WrappedRunnable()
+ {
+ protected void runMayThrow() throws Exception
+ {
+ String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName);
+ logger_.info(message);
+ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
+
+ List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
+ for (Range<Token> range : ranges)
+ {
+ AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+ if (future == null)
+ continue;
+ futures.add(future);
+ // wait for a session to be done with its differencing before starting the next one
+ try
+ {
+ future.session.differencingDone.await();
+ }
+ catch (InterruptedException e)
+ {
+ message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
+ logger_.error(message, e);
+ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ }
+ for (AntiEntropyService.RepairFuture future : futures)
+ {
+ try
+ {
+ future.get();
+ message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
+ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
+ }
+ catch (ExecutionException e)
+ {
+ message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
+ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ catch (Exception e)
+ {
+ message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
+ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ }
+ sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
+ }
+ }).start();
+ }
+ return cmd;
+ }
+
+ /**
* Trigger proactive repair for a table and column families.
* @param tableName
* @param columnFamilies
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c4c6a1d..c34faf3 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -26,12 +26,14 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import javax.management.NotificationEmitter;
+
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.UnavailableException;
-public interface StorageServiceMBean
+public interface StorageServiceMBean extends NotificationEmitter
{
/**
* Retrieve the list of live nodes in the cluster, where "liveness" is
@@ -242,6 +244,18 @@ public interface StorageServiceMBean
public void forceTableFlush(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
+ * Invoke repair asynchronously.
+ * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+ * Notification format is:
+ * type: "repair"
+ * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+ *
+ * @return Repair command number, or 0 if nothing to repair
+ * @see #forceTableRepair(String, boolean, String...)
+ */
+ public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies);
+
+ /**
* Triggers proactive repair for given column families, or all columnfamilies for the given table
* if none are explicitly listed.
* @param tableName
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index bba96de..8d4f9a1 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -32,8 +32,6 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.service.CacheServiceMBean;
-import org.apache.cassandra.service.StorageProxyMBean;
import org.apache.commons.cli.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
@@ -42,6 +40,8 @@ import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.service.StorageProxyMBean;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.Pair;
@@ -1040,10 +1040,8 @@ public class NodeCmd
{
case REPAIR :
boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
- if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
- probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
- else
- probe.forceTableRepair(keyspace, snapshot, columnFamilies);
+ boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
+ probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies);
break;
case FLUSH :
try { probe.forceTableFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3ed4524..264ea90 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -26,10 +26,12 @@ import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
@@ -52,6 +54,7 @@ import org.apache.cassandra.streaming.StreamingService;
import org.apache.cassandra.streaming.StreamingServiceMBean;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
/**
* JMX client operations for Cassandra.
@@ -204,6 +207,28 @@ public class NodeProbe
ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
}
+ public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies) throws IOException
+ {
+ RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+ try
+ {
+ ssProxy.addNotificationListener(runner, null, null);
+ runner.repairAndWait(ssProxy, isSequential, primaryRange);
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e) ;
+ }
+ finally
+ {
+ try
+ {
+ ssProxy.removeNotificationListener(runner);
+ }
+ catch (ListenerNotFoundException ignored) {}
+ }
+ }
+
public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
{
ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
@@ -770,3 +795,52 @@ class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnab
throw new UnsupportedOperationException();
}
}
+
+class RepairRunner implements NotificationListener
+{
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ private final Condition condition = new SimpleCondition();
+ private final PrintStream out;
+ private final String keyspace;
+ private final String[] columnFamilies;
+ private int cmd;
+
+ RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
+ {
+ this.out = out;
+ this.keyspace = keyspace;
+ this.columnFamilies = columnFamilies;
+ }
+
+ public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean primaryRangeOnly) throws InterruptedException
+ {
+ cmd = ssProxy.forceRepairAsync(keyspace, isSequential, primaryRangeOnly, columnFamilies);
+ if (cmd > 0)
+ {
+ condition.await();
+ }
+ else
+ {
+ String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+ out.println(message);
+ }
+ }
+
+ public void handleNotification(Notification notification, Object handback)
+ {
+ if ("repair".equals(notification.getType()))
+ {
+ // repair status is int array with [0] = cmd number, [1] = status
+ int[] status = (int[]) notification.getUserData();
+ assert status.length == 2;
+ // we only output what we invoked
+ if (cmd == status[0])
+ {
+ String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
+ out.println(message);
+ if (status[1] == AntiEntropyService.Status.FINISHED.ordinal())
+ condition.signalAll();
+ }
+ }
+ }
+}