You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/15 23:26:18 UTC

svn commit: r815497 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/MinorCompactionManager.java tools/NodeProbe.java

Author: jbellis
Date: Tue Sep 15 21:26:12 2009
New Revision: 815497

URL: http://svn.apache.org/viewvc?rev=815497&view=rev
Log:
add mbean to get/set compaction thresholds.  patch by Sammy Yu; reviewed by jbellis for CASSANDRA-447

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=815497&r1=815496&r2=815497&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Tue Sep 15 21:26:12 2009
@@ -19,12 +19,15 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
@@ -34,14 +37,14 @@
 
 import org.apache.log4j.Logger;
 
-class MinorCompactionManager
+public class MinorCompactionManager implements MinorCompactionManagerMBean
 {
+    public static String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=MinorCompactionManager";
     private static MinorCompactionManager instance_;
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
-    private static final long intervalInMins_ = 5;
-    static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
-    static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
+    private int minimumCompactionThreshold_ = 4; // compact this many sstables min at a time
+    private int maximumCompactionThreshold = 32; // compact this many sstables max at a time
 
     public static MinorCompactionManager instance()
     {
@@ -51,7 +54,15 @@
             try
             {
                 if ( instance_ == null )
+                {
                     instance_ = new MinorCompactionManager();
+                    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+                    mbs.registerMBean(instance_, new ObjectName(MBEAN_OBJECT_NAME));
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
             }
             finally
             {
@@ -157,7 +168,7 @@
      */
     public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
     {
-        return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
+        return submit(columnFamilyStore, minimumCompactionThreshold_, maximumCompactionThreshold);
     }
 
     Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
@@ -186,4 +197,36 @@
     {
         compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
     }
+
+    /**
+     * Gets the minimum number of sstables in queue before compaction kicks off
+     */
+    public int getMinimumCompactionThreshold()
+    {
+        return minimumCompactionThreshold_;
+    }
+
+    /**
+     * Sets the minimum number of sstables in queue before compaction kicks off
+     */
+    public void setMinimumCompactionThreshold(int threshold)
+    {
+        minimumCompactionThreshold_ = threshold;
+    }
+
+    /**
+     * Gets the maximum number of sstables in queue before compaction kicks off
+     */
+    public int getMaximumCompactionThreshold()
+    {
+        return maximumCompactionThreshold;
+    }
+
+    /**
+     * Sets the maximum number of sstables in queue before compaction kicks off
+     */
+    public void setMaximumCompactionThreshold(int threshold)
+    {
+        maximumCompactionThreshold = threshold;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=815497&r1=815496&r2=815497&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep 15 21:26:12 2009
@@ -42,6 +42,8 @@
 
 import org.apache.cassandra.concurrent.IExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.MinorCompactionManager;
+import org.apache.cassandra.db.MinorCompactionManagerMBean;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageServiceMBean;
@@ -72,7 +74,8 @@
     private StorageServiceMBean ssProxy;
     private MemoryMXBean memProxy;
     private RuntimeMXBean runtimeProxy;
-    
+    private MinorCompactionManagerMBean mcmProxy;
+
     static
     {
         options = new Options();
@@ -156,6 +159,8 @@
         {
             ObjectName name = new ObjectName(ssObjName);
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
+            name = new ObjectName(MinorCompactionManager.MBEAN_OBJECT_NAME);
+            mcmProxy = JMX.newMBeanProxy(mbeanServerConn, name, MinorCompactionManagerMBean.class);
         } catch (MalformedObjectNameException e)
         {
             throw new RuntimeException(
@@ -501,15 +506,31 @@
     }
 
     /**
-     * Retrieve any non-option arguments passed on the command line.
-     * 
-     * @return non-option command args
+     * Get the compaction threshold
+     *
+     * @param outs the stream to write to
      */
-    private String[] getArgs()
+    public void getCompactionThreshold(PrintStream outs)
     {
-        return cmd.getArgs();
+        outs.println("Current compaction threshold: Min=" +  mcmProxy.getMinimumCompactionThreshold() +
+            ", Max=" +  mcmProxy.getMaximumCompactionThreshold());
     }
-    
+
+    /**
+     * Set the compaction threshold
+     *
+     * @param minimumCompactionThreshold minimum compaction threshold
+     * @param maximumCompactionThreshold maximum compaction threshold
+     */
+    public void setCompactionThreshold(int minimumCompactionThreshold, int maximumCompactionThreshold)
+    {
+        mcmProxy.setMinimumCompactionThreshold(minimumCompactionThreshold);
+        if (maximumCompactionThreshold > 0)
+        {
+             mcmProxy.setMaximumCompactionThreshold(maximumCompactionThreshold);
+        }
+    }
+
     /**
      * Parse the supplied command line arguments.
      * 
@@ -523,13 +544,24 @@
     }
     
     /**
+     * Retrieve any non-option arguments passed on the command line.
+     * 
+     * @return non-option command args
+     */
+    private String[] getArgs()
+    {
+        return cmd.getArgs();
+    }
+    
+    /**
      * Prints usage information to stdout.
      */
     private static void printUsage()
     {
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
-                "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
+                "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary, " +
+                " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
     }
@@ -631,6 +663,26 @@
             }
             probe.forceTableFlushBinary(probe.getArgs()[1]);
         }
+        else if (cmdName.equals("getcompactionthreshold"))
+        {   
+            probe.getCompactionThreshold(System.out);
+        }
+        else if (cmdName.equals("setcompactionthreshold"))
+        {
+            if (arguments.length < 2)
+            {
+                System.err.println("Missing threshold value(s)");
+                NodeProbe.printUsage();
+                System.exit(1);
+            }
+            int minthreshold = Integer.parseInt(arguments[1]);
+            int maxthreshold = 0;
+            if (arguments.length > 2)
+            {   
+                maxthreshold = Integer.parseInt(arguments[2]);
+            }
+            probe.setCompactionThreshold(minthreshold, maxthreshold);
+        }
         else
         {
             System.err.println("Unrecognized command: " + cmdName + ".");