You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/11 10:45:07 UTC

svn commit: r1090979 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/

Author: slebresne
Date: Mon Apr 11 08:45:07 2011
New Revision: 1090979

URL: http://svn.apache.org/viewvc?rev=1090979&view=rev
Log:
Compaction throttling
patch by stuhood; reviewed by slebresne for CASSANDRA-2156

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Apr 11 08:45:07 2011
@@ -20,7 +20,7 @@
  * push replication_factor into strategy_options (CASSANDRA-1263)
  * give snapshots the same name on each node (CASSANDRA-1791)
  * multithreaded compaction (CASSANDRA-2191)
-
+ * compaction throttling (CASSANDRA-2156)
 
 0.7.5
  * Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318)

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Apr 11 08:45:07 2011
@@ -250,9 +250,17 @@ column_index_size_in_kb: 64
 in_memory_compaction_limit_in_mb: 64
 
 # Enables multiple compactions to execute at once. This is highly recommended
-# for preserving read performance in a mixed read/write workload.
+# for preserving read performance in a mixed read/write workload as this
+# avoids sstables from accumulating during long running compactions.
 compaction_multithreading: true
 
+# Throttles compaction to the given total throughput across the entire
+# system. The faster you insert data, the faster you need to compact in
+# order to keep the sstable count down, but in general, setting this to
+# 16 to 32 times the rate you are inserting data is more than sufficient.
+# Setting this to 0 disables throttling.
+compaction_throughput_mb_per_sec: 16
+
 # Track cached row keys during compaction, and re-cache their new
 # positions in the compacted sstable.  Disable if you use really large
 # key caches.

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Apr 11 08:45:07 2011
@@ -83,6 +83,7 @@ public class Config
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 256;
     public Boolean compaction_multithreading = true;
+    public Integer compaction_throughput_mb_per_sec = 16;
     
     public String[] data_file_directories;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Apr 11 08:45:07 2011
@@ -344,6 +344,9 @@ public class DatabaseDescriptor
             if (conf.compaction_multithreading == null)
                 conf.compaction_multithreading = true;
 
+            if (conf.compaction_throughput_mb_per_sec == null)
+                conf.compaction_throughput_mb_per_sec = 16;
+
             /* data file and commit log directories. they get created later, when they're needed. */
             if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
             {
@@ -731,6 +734,16 @@ public class DatabaseDescriptor
         return conf.compaction_multithreading;
     }
 
+    public static int getCompactionThroughputMbPerSec()
+    {
+        return conf.compaction_throughput_mb_per_sec;
+    }
+
+    public static void setCompactionThroughputMbPerSec(int value)
+    {
+        conf.compaction_throughput_mb_per_sec = value;
+    }
+
     public static String[] getAllDataFileLocations()
     {
         return conf.data_file_directories;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Apr 11 08:45:07 2011
@@ -1184,6 +1184,11 @@ public class CompactionManager implement
         }
     }
 
+    public int getActiveCompactions()
+    {
+        return executor.getActiveCount();
+    }
+
     private static class CompactionExecutor extends DebuggableThreadPoolExecutor
     {
         // a synchronized identity set of running tasks to their compaction info

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon Apr 11 08:45:07 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -59,6 +60,14 @@ implements Closeable, CompactionInfo.Hol
     private long bytesRead;
     private long row;
 
+    // the bytes that had been compacted the last time we delayed to throttle,
+    // and the time in milliseconds when we last throttled
+    private long bytesAtLastDelay;
+    private long timeAtLastDelay;
+
+    // current target bytes to compact per millisecond
+    private int targetBytesPerMS = -1;
+
     public CompactionIterator(String type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
     {
         this(type, getCollatingIterator(sstables), controller);
@@ -140,6 +149,7 @@ implements Closeable, CompactionInfo.Hol
                 {
                     bytesRead += scanner.getFilePointer();
                 }
+                throttle();
             }
         }
     }
@@ -161,6 +171,42 @@ implements Closeable, CompactionInfo.Hol
         return new PrecompactedRow(controller, rows);
     }
 
+    private void throttle()
+    {
+        if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1)
+            // throttling disabled
+            return;
+        int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+
+        // bytes compacted and time passed since last delay
+        long bytesSinceLast = bytesRead - bytesAtLastDelay;
+        long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
+
+        // determine the current target
+        int newTarget = totalBytesPerMS /
+            Math.max(1, CompactionManager.instance.getActiveCompactions());
+        if (newTarget != targetBytesPerMS)
+            logger.info(String.format("%s now compacting at %d bytes/ms.",
+                                      this,
+                                      newTarget));
+        targetBytesPerMS = newTarget;
+
+        // the excess bytes that were compacted in this period
+        long excessBytes = bytesSinceLast - msSinceLast * targetBytesPerMS;
+
+        // the time to delay to recap the deficit
+        long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
+        if (timeToDelay > 0)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace(String.format("Compacted %d bytes in %d ms: throttling for %d ms",
+                                           bytesSinceLast, msSinceLast, timeToDelay));
+            try { Thread.sleep(timeToDelay); } catch (InterruptedException e) { throw new AssertionError(e); }
+        }
+        bytesAtLastDelay = bytesRead;
+        timeAtLastDelay = System.currentTimeMillis();
+    }
+
     public void close() throws IOException
     {
         FileUtils.close(getScanners());

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Apr 11 08:45:07 2011
@@ -506,6 +506,10 @@ public class StorageService implements I
         return joined;
     }
 
+    public void setCompactionThroughputMbPerSec(int value) {
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(value);
+    }
+
     private void setMode(String m, boolean log)
     {
         operationMode = m;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Mon Apr 11 08:45:07 2011
@@ -295,4 +295,6 @@ public interface StorageServiceMBean
     // allows a node that have been started without joining the ring to join it
     public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
     public boolean isJoined();
+
+    public void setCompactionThroughputMbPerSec(int value);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Mon Apr 11 08:45:07 2011
@@ -79,7 +79,7 @@ public class NodeCmd
         DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
         COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
-        DISABLETHRIFT, ENABLETHRIFT, JOIN
+        DISABLETHRIFT, ENABLETHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT
     }
 
     
@@ -111,6 +111,7 @@ public class NodeCmd
         addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)");
         addCmdHelp(header, "move <new token>", "Move node on the token ring to a new token");
         addCmdHelp(header, "removetoken status|force|<token>", "Show status of current token removal, force completion of pending removal or remove providen token");
+        addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling.");
 
         // Two args
         addCmdHelp(header, "snapshot [keyspaces...] -t [snapshotName]", "Take a snapshot of the specified keyspaces using optional name snapshotName");
@@ -592,6 +593,11 @@ public class NodeCmd
                 probe.joinRing();
                 break;
 
+            case SETCOMPACTIONTHROUGHPUT :
+                if (arguments.length != 2) { badUse("Missing value argument."); }
+                probe.setCompactionThroughput(Integer.valueOf(arguments[1]));
+                break;
+
             case REMOVETOKEN :
                 if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); }
                 else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1090979&r1=1090978&r2=1090979&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Mon Apr 11 08:45:07 2011
@@ -550,6 +550,11 @@ public class NodeProbe
     {
         return ssProxy.isInitialized();
     }
+
+    public void setCompactionThroughput(int value)
+    {
+        ssProxy.setCompactionThroughputMbPerSec(value);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>