You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/15 23:26:18 UTC
svn commit: r815497 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/MinorCompactionManager.java tools/NodeProbe.java
Author: jbellis
Date: Tue Sep 15 21:26:12 2009
New Revision: 815497
URL: http://svn.apache.org/viewvc?rev=815497&view=rev
Log:
add mbean to get/set compaction thresholds. patch by Sammy Yu; reviewed by jbellis for CASSANDRA-447
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=815497&r1=815496&r2=815497&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Tue Sep 15 21:26:12 2009
@@ -19,12 +19,15 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
@@ -34,14 +37,14 @@
import org.apache.log4j.Logger;
-class MinorCompactionManager
+public class MinorCompactionManager implements MinorCompactionManagerMBean
{
+ public static String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=MinorCompactionManager";
private static MinorCompactionManager instance_;
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
- private static final long intervalInMins_ = 5;
- static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
- static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
+ private int minimumCompactionThreshold_ = 4; // compact this many sstables min at a time
+ private int maximumCompactionThreshold = 32; // compact this many sstables max at a time
public static MinorCompactionManager instance()
{
@@ -51,7 +54,15 @@
try
{
if ( instance_ == null )
+ {
instance_ = new MinorCompactionManager();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(instance_, new ObjectName(MBEAN_OBJECT_NAME));
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
}
finally
{
@@ -157,7 +168,7 @@
*/
public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
{
- return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
+ return submit(columnFamilyStore, minimumCompactionThreshold_, maximumCompactionThreshold);
}
Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
@@ -186,4 +197,36 @@
{
compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
}
+
+ /**
+ * Gets the minimum number of sstables in queue before compaction kicks off
+ */
+ public int getMinimumCompactionThreshold()
+ {
+ return minimumCompactionThreshold_;
+ }
+
+ /**
+ * Sets the minimum number of sstables in queue before compaction kicks off
+ */
+ public void setMinimumCompactionThreshold(int threshold)
+ {
+ minimumCompactionThreshold_ = threshold;
+ }
+
+ /**
+ * Gets the maximum number of sstables in queue before compaction kicks off
+ */
+ public int getMaximumCompactionThreshold()
+ {
+ return maximumCompactionThreshold;
+ }
+
+ /**
+ * Sets the maximum number of sstables in queue before compaction kicks off
+ */
+ public void setMaximumCompactionThreshold(int threshold)
+ {
+ maximumCompactionThreshold = threshold;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=815497&r1=815496&r2=815497&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep 15 21:26:12 2009
@@ -42,6 +42,8 @@
import org.apache.cassandra.concurrent.IExecutorMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.MinorCompactionManager;
+import org.apache.cassandra.db.MinorCompactionManagerMBean;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageServiceMBean;
@@ -72,7 +74,8 @@
private StorageServiceMBean ssProxy;
private MemoryMXBean memProxy;
private RuntimeMXBean runtimeProxy;
-
+ private MinorCompactionManagerMBean mcmProxy;
+
static
{
options = new Options();
@@ -156,6 +159,8 @@
{
ObjectName name = new ObjectName(ssObjName);
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
+ name = new ObjectName(MinorCompactionManager.MBEAN_OBJECT_NAME);
+ mcmProxy = JMX.newMBeanProxy(mbeanServerConn, name, MinorCompactionManagerMBean.class);
} catch (MalformedObjectNameException e)
{
throw new RuntimeException(
@@ -501,15 +506,31 @@
}
/**
- * Retrieve any non-option arguments passed on the command line.
- *
- * @return non-option command args
+ * Get the compaction threshold
+ *
+ * @param outs the stream to write to
*/
- private String[] getArgs()
+ public void getCompactionThreshold(PrintStream outs)
{
- return cmd.getArgs();
+ outs.println("Current compaction threshold: Min=" + mcmProxy.getMinimumCompactionThreshold() +
+ ", Max=" + mcmProxy.getMaximumCompactionThreshold());
}
-
+
+ /**
+ * Set the compaction threshold
+ *
+ * @param minimumCompactionThreshold minimum compaction threshold
+ * @param maximumCompactionThreshold maximum compaction threshold
+ */
+ public void setCompactionThreshold(int minimumCompactionThreshold, int maximumCompactionThreshold)
+ {
+ mcmProxy.setMinimumCompactionThreshold(minimumCompactionThreshold);
+ if (maximumCompactionThreshold > 0)
+ {
+ mcmProxy.setMaximumCompactionThreshold(maximumCompactionThreshold);
+ }
+ }
+
/**
* Parse the supplied command line arguments.
*
@@ -523,13 +544,24 @@
}
/**
+ * Retrieve any non-option arguments passed on the command line.
+ *
+ * @return non-option command args
+ */
+ private String[] getArgs()
+ {
+ return cmd.getArgs();
+ }
+
+ /**
* Prints usage information to stdout.
*/
private static void printUsage()
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
- "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
+ "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary, " +
+ " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -631,6 +663,26 @@
}
probe.forceTableFlushBinary(probe.getArgs()[1]);
}
+ else if (cmdName.equals("getcompactionthreshold"))
+ {
+ probe.getCompactionThreshold(System.out);
+ }
+ else if (cmdName.equals("setcompactionthreshold"))
+ {
+ if (arguments.length < 2)
+ {
+ System.err.println("Missing threshold value(s)");
+ NodeProbe.printUsage();
+ System.exit(1);
+ }
+ int minthreshold = Integer.parseInt(arguments[1]);
+ int maxthreshold = 0;
+ if (arguments.length > 2)
+ {
+ maxthreshold = Integer.parseInt(arguments[2]);
+ }
+ probe.setCompactionThreshold(minthreshold, maxthreshold);
+ }
else
{
System.err.println("Unrecognized command: " + cmdName + ".");