You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/11 10:45:07 UTC
svn commit: r1090979 - in /cassandra/trunk: ./ conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/tools/
Author: slebresne
Date: Mon Apr 11 08:45:07 2011
New Revision: 1090979
URL: http://svn.apache.org/viewvc?rev=1090979&view=rev
Log:
Compaction throttling
patch by stuhood; reviewed by slebresne for CASSANDRA-2156
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Apr 11 08:45:07 2011
@@ -20,7 +20,7 @@
* push replication_factor into strategy_options (CASSANDRA-1263)
* give snapshots the same name on each node (CASSANDRA-1791)
* multithreaded compaction (CASSANDRA-2191)
-
+ * compaction throttling (CASSANDRA-2156)
0.7.5
* Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318)
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Apr 11 08:45:07 2011
@@ -250,9 +250,17 @@ column_index_size_in_kb: 64
in_memory_compaction_limit_in_mb: 64
# Enables multiple compactions to execute at once. This is highly recommended
-# for preserving read performance in a mixed read/write workload.
+# for preserving read performance in a mixed read/write workload as this
+# avoids sstables from accumulating during long running compactions.
compaction_multithreading: true
+# Throttles compaction to the given total throughput across the entire
+# system. The faster you insert data, the faster you need to compact in
+# order to keep the sstable count down, but in general, setting this to
+# 16 to 32 times the rate you are inserting data is more than sufficient.
+# Setting this to 0 disables throttling.
+compaction_throughput_mb_per_sec: 16
+
# Track cached row keys during compaction, and re-cache their new
# positions in the compacted sstable. Disable if you use really large
# key caches.
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Apr 11 08:45:07 2011
@@ -83,6 +83,7 @@ public class Config
public Integer column_index_size_in_kb = 64;
public Integer in_memory_compaction_limit_in_mb = 256;
public Boolean compaction_multithreading = true;
+ public Integer compaction_throughput_mb_per_sec = 16;
public String[] data_file_directories;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Apr 11 08:45:07 2011
@@ -344,6 +344,9 @@ public class DatabaseDescriptor
if (conf.compaction_multithreading == null)
conf.compaction_multithreading = true;
+ if (conf.compaction_throughput_mb_per_sec == null)
+ conf.compaction_throughput_mb_per_sec = 16;
+
/* data file and commit log directories. they get created later, when they're needed. */
if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
{
@@ -731,6 +734,16 @@ public class DatabaseDescriptor
return conf.compaction_multithreading;
}
+ public static int getCompactionThroughputMbPerSec()
+ {
+ return conf.compaction_throughput_mb_per_sec;
+ }
+
+ public static void setCompactionThroughputMbPerSec(int value)
+ {
+ conf.compaction_throughput_mb_per_sec = value;
+ }
+
public static String[] getAllDataFileLocations()
{
return conf.data_file_directories;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Apr 11 08:45:07 2011
@@ -1184,6 +1184,11 @@ public class CompactionManager implement
}
}
+ public int getActiveCompactions()
+ {
+ return executor.getActiveCount();
+ }
+
private static class CompactionExecutor extends DebuggableThreadPoolExecutor
{
// a synchronized identity set of running tasks to their compaction info
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon Apr 11 08:45:07 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -59,6 +60,14 @@ implements Closeable, CompactionInfo.Hol
private long bytesRead;
private long row;
+ // the bytes that had been compacted the last time we delayed to throttle,
+ // and the time in milliseconds when we last throttled
+ private long bytesAtLastDelay;
+ private long timeAtLastDelay;
+
+ // current target bytes to compact per millisecond
+ private int targetBytesPerMS = -1;
+
public CompactionIterator(String type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
this(type, getCollatingIterator(sstables), controller);
@@ -140,6 +149,7 @@ implements Closeable, CompactionInfo.Hol
{
bytesRead += scanner.getFilePointer();
}
+ throttle();
}
}
}
@@ -161,6 +171,42 @@ implements Closeable, CompactionInfo.Hol
return new PrecompactedRow(controller, rows);
}
+ private void throttle()
+ {
+ if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1)
+ // throttling disabled
+ return;
+ int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+
+ // bytes compacted and time passed since last delay
+ long bytesSinceLast = bytesRead - bytesAtLastDelay;
+ long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
+
+ // determine the current target
+ int newTarget = totalBytesPerMS /
+ Math.max(1, CompactionManager.instance.getActiveCompactions());
+ if (newTarget != targetBytesPerMS)
+ logger.info(String.format("%s now compacting at %d bytes/ms.",
+ this,
+ newTarget));
+ targetBytesPerMS = newTarget;
+
+ // the excess bytes that were compacted in this period
+ long excessBytes = bytesSinceLast - msSinceLast * targetBytesPerMS;
+
+ // the time to delay to recap the deficit
+ long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
+ if (timeToDelay > 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace(String.format("Compacted %d bytes in %d ms: throttling for %d ms",
+ bytesSinceLast, msSinceLast, timeToDelay));
+ try { Thread.sleep(timeToDelay); } catch (InterruptedException e) { throw new AssertionError(e); }
+ }
+ bytesAtLastDelay = bytesRead;
+ timeAtLastDelay = System.currentTimeMillis();
+ }
+
public void close() throws IOException
{
FileUtils.close(getScanners());
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Apr 11 08:45:07 2011
@@ -506,6 +506,10 @@ public class StorageService implements I
return joined;
}
+ public void setCompactionThroughputMbPerSec(int value) {
+ DatabaseDescriptor.setCompactionThroughputMbPerSec(value);
+ }
+
private void setMode(String m, boolean log)
{
operationMode = m;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Mon Apr 11 08:45:07 2011
@@ -295,4 +295,6 @@ public interface StorageServiceMBean
// allows a node that have been started without joining the ring to join it
public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
public boolean isJoined();
+
+ public void setCompactionThroughputMbPerSec(int value);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Mon Apr 11 08:45:07 2011
@@ -79,7 +79,7 @@ public class NodeCmd
DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
- DISABLETHRIFT, ENABLETHRIFT, JOIN
+ DISABLETHRIFT, ENABLETHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT
}
@@ -111,6 +111,7 @@ public class NodeCmd
addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)");
addCmdHelp(header, "move <new token>", "Move node on the token ring to a new token");
addCmdHelp(header, "removetoken status|force|<token>", "Show status of current token removal, force completion of pending removal or remove providen token");
+ addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling.");
// Two args
addCmdHelp(header, "snapshot [keyspaces...] -t [snapshotName]", "Take a snapshot of the specified keyspaces using optional name snapshotName");
@@ -592,6 +593,11 @@ public class NodeCmd
probe.joinRing();
break;
+ case SETCOMPACTIONTHROUGHPUT :
+ if (arguments.length != 2) { badUse("Missing value argument."); }
+ probe.setCompactionThroughput(Integer.valueOf(arguments[1]));
+ break;
+
case REMOVETOKEN :
if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); }
else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); }
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Mon Apr 11 08:45:07 2011
@@ -550,6 +550,11 @@ public class NodeProbe
{
return ssProxy.isInitialized();
}
+
+ public void setCompactionThroughput(int value)
+ {
+ ssProxy.setCompactionThroughputMbPerSec(value);
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>