You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC

[3/8] git commit: Show progress on nodetool repair command; patch by yukim reviewed Sylvain Lebresne for CASSANDRA-4767

Show progress on nodetool repair command; patch by yukim reviewed Sylvain Lebresne for CASSANDRA-4767


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0906b7cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0906b7cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0906b7cc

Branch: refs/heads/cassandra-1.2
Commit: 0906b7cc5173770e04932432d40503f7c39eb61f
Parents: 55f936f
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 8 11:20:25 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 8 11:20:25 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/service/AntiEntropyService.java      |   10 ++
 .../apache/cassandra/service/StorageService.java   |   89 ++++++++++++++-
 .../cassandra/service/StorageServiceMBean.java     |   16 +++-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   10 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   74 ++++++++++++
 6 files changed, 191 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c5c3863..5e87435 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * fix specifying and altering crc_check_chance (CASSANDRA-5053)
  * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079)
  * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
+ * nodetool repair command now prints progress (CASSANDRA-4767)
 
 
 1.1.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 0d7c1b4..dc03122 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -104,6 +104,11 @@ public class AntiEntropyService
                                                          "internal");
     }
 
+    public static enum Status
+    {
+        STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+    }
+
     /**
      * A map of active session.
      */
@@ -646,6 +651,11 @@ public class AntiEntropyService
             return sessionName;
         }
 
+        public Range<Token> getRange()
+        {
+            return range;
+        }
+
         RepairFuture getFuture()
         {
             return new RepairFuture(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a948786..ad05ce2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,7 +29,10 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
 
 import com.google.common.base.Supplier;
@@ -78,12 +81,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  * This class will also maintain histograms of the load information
  * of other nodes in the cluster.
  */
-public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean
+public class StorageService extends NotificationBroadcasterSupport implements IEndpointStateChangeSubscriber, StorageServiceMBean
 {
     private static Logger logger_ = LoggerFactory.getLogger(StorageService.class);
 
     public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
 
+    /* JMX notification serial number counter */
+    private final AtomicLong notificationSerialNumber = new AtomicLong();
+
     /* All verb handler identifiers */
     public enum Verb
     {
@@ -244,6 +250,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     private static final AtomicInteger nextRepairCommand = new AtomicInteger();
 
+    private final ObjectName jmxObjectName;
+
     public void finishBootstrapping()
     {
         isBootstrapMode = false;
@@ -265,7 +273,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=StorageService"));
+            jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
+            mbs.registerMBean(this, jmxObjectName);
         }
         catch (Exception e)
         {
@@ -1935,6 +1944,82 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     }
 
     /**
+     * Sends JMX notification to subscribers.
+     *
+     * @param type Message type
+     * @param message Message itself
+     * @param userObject Arbitrary object to attach to notification
+     */
+    public void sendNotification(String type, String message, Object userObject)
+    {
+        Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message);
+        jmxNotification.setUserData(userObject);
+        sendNotification(jmxNotification);
+    }
+
+    public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies)
+    {
+        if (Table.SYSTEM_TABLE.equals(tableName))
+            return 0;
+
+        final int cmd = nextRepairCommand.incrementAndGet();
+        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
+        if (ranges.size() > 0)
+        {
+            new Thread(new WrappedRunnable()
+            {
+                protected void runMayThrow() throws Exception
+                {
+                    String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName);
+                    logger_.info(message);
+                    sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
+
+                    List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
+                    for (Range<Token> range : ranges)
+                    {
+                        AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+                        if (future == null)
+                            continue;
+                        futures.add(future);
+                        // wait for a session to be done with its differencing before starting the next one
+                        try
+                        {
+                            future.session.differencingDone.await();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise.";
+                            logger_.error(message, e);
+                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+                        }
+                    }
+                    for (AntiEntropyService.RepairFuture future : futures)
+                    {
+                        try
+                        {
+                            future.get();
+                            message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString());
+                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
+                        }
+                        catch (ExecutionException e)
+                        {
+                            message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
+                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+                        }
+                        catch (Exception e)
+                        {
+                            message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
+                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+                        }
+                    }
+                    sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
+                }
+            }).start();
+        }
+        return cmd;
+    }
+
+    /**
      * Trigger proactive repair for a table and column families.
      * @param tableName
      * @param columnFamilies

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c4c6a1d..c34faf3 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -26,12 +26,14 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import javax.management.NotificationEmitter;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.UnavailableException;
 
 
-public interface StorageServiceMBean
+public interface StorageServiceMBean extends NotificationEmitter
 {
     /**
      * Retrieve the list of live nodes in the cluster, where "liveness" is
@@ -242,6 +244,18 @@ public interface StorageServiceMBean
     public void forceTableFlush(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 
     /**
+     * Invoke repair asynchronously.
+     * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+     * Notification format is:
+     *   type: "repair"
+     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+     *
+     * @return Repair command number, or 0 if nothing to repair
+     * @see #forceTableRepair(String, boolean, String...)
+     */
+    public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies);
+
+    /**
      * Triggers proactive repair for given column families, or all columnfamilies for the given table
      * if none are explicitly listed.
      * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index bba96de..8d4f9a1 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -32,8 +32,6 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.service.CacheServiceMBean;
-import org.apache.cassandra.service.StorageProxyMBean;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
@@ -42,6 +40,8 @@ import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.service.StorageProxyMBean;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.Pair;
@@ -1040,10 +1040,8 @@ public class NodeCmd
             {
                 case REPAIR  :
                     boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
-                    if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
-                        probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
-                    else
-                        probe.forceTableRepair(keyspace, snapshot, columnFamilies);
+                    boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
+                    probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3ed4524..264ea90 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -26,10 +26,12 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.RuntimeMXBean;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
 import javax.management.*;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
@@ -52,6 +54,7 @@ import org.apache.cassandra.streaming.StreamingService;
 import org.apache.cassandra.streaming.StreamingServiceMBean;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
 
 /**
  * JMX client operations for Cassandra.
@@ -204,6 +207,28 @@ public class NodeProbe
         ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
     }
 
+    public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies) throws IOException
+    {
+        RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+        try
+        {
+            ssProxy.addNotificationListener(runner, null, null);
+            runner.repairAndWait(ssProxy, isSequential, primaryRange);
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e) ;
+        }
+        finally
+        {
+            try
+            {
+               ssProxy.removeNotificationListener(runner);
+            }
+            catch (ListenerNotFoundException ignored) {}
+        }
+    }
+
     public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException
     {
         ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
@@ -770,3 +795,52 @@ class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnab
         throw new UnsupportedOperationException();
     }
 }
+
+class RepairRunner implements NotificationListener
+{
+    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+    private final Condition condition = new SimpleCondition();
+    private final PrintStream out;
+    private final String keyspace;
+    private final String[] columnFamilies;
+    private int cmd;
+
+    RepairRunner(PrintStream out, String keyspace, String... columnFamilies)
+    {
+        this.out = out;
+        this.keyspace = keyspace;
+        this.columnFamilies = columnFamilies;
+    }
+
+    public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean primaryRangeOnly) throws InterruptedException
+    {
+        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, primaryRangeOnly, columnFamilies);
+        if (cmd > 0)
+        {
+            condition.await();
+        }
+        else
+        {
+            String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+            out.println(message);
+        }
+    }
+
+    public void handleNotification(Notification notification, Object handback)
+    {
+        if ("repair".equals(notification.getType()))
+        {
+            // repair status is int array with [0] = cmd number, [1] = status
+            int[] status = (int[]) notification.getUserData();
+            assert status.length == 2;
+            // we only output what we invoked
+            if (cmd == status[0])
+            {
+                String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage());
+                out.println(message);
+                if (status[1] == AntiEntropyService.Status.FINISHED.ordinal())
+                    condition.signalAll();
+            }
+        }
+    }
+}