You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/05 01:08:09 UTC

git commit: Add ability to throttle batchlog replay

Updated Branches:
  refs/heads/cassandra-1.2 7171b7a2c -> 2a7c20ea9


Add ability to throttle batchlog replay

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6550


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a7c20ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a7c20ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a7c20ea

Branch: refs/heads/cassandra-1.2
Commit: 2a7c20ea9111c05964400fe0a30e7b75ff719277
Parents: 7171b7a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Jan 5 03:06:47 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jan 5 03:06:47 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  9 ++++++++
 conf/cassandra.yaml                             |  4 ++++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++++
 .../apache/cassandra/db/BatchlogManager.java    | 23 ++++++++++++++------
 6 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64146c1..5a85977 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Validate SliceRange start and finish lengths (CASSANDRA-6521)
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
+ * Add ability to throttle batchlog replay (CASSANDRA-6550)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6293448..214fd05 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,6 +14,15 @@ restore snapshots created with the previous major version using the
 using the provided 'sstableupgrade' tool.
 
 
+1.2.14
+======
+
+Features
+--------
+    - Batchlog replay can be, and is throttled by default now.
+      See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+
+
 1.2.13
 ======
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 712f134..d038cde 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -53,6 +53,10 @@ hinted_handoff_throttle_in_kb: 1024
 # cross-dc handoff tends to be slower
 max_hints_delivery_threads: 2
 
+# Maximum throttle in KBs per second, total. This will be
+# reduced proportionally to the number of nodes in the cluster.
+batchlog_replay_throttle_in_kb: 1024
+
 # The following setting populates the page cache on memtable flush and compaction
 # WARNING: Enable this setting only when the whole node's data fits in memory.
 # Defaults to: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 292161b..1c19a85 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -151,6 +151,7 @@ public class Config
     public Double reduce_cache_sizes_at = 1.0;
     public double reduce_cache_capacity_to = 0.6;
     public int hinted_handoff_throttle_in_kb = 1024;
+    public int batchlog_replay_throttle_in_kb = 1024;
     public int max_hints_delivery_threads = 1;
     public boolean compaction_preheat_key_cache = true;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0db2f85..3ed82f5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1231,6 +1231,11 @@ public class DatabaseDescriptor
         return conf.hinted_handoff_throttle_in_kb;
     }
 
+    public static int getBatchlogReplayThrottleInKB()
+    {
+        return conf.batchlog_replay_throttle_in_kb;
+    }
+
     public static int getMaxHintsThread()
     {
         return conf.max_hints_delivery_threads;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 5fd55a3..1af4909 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,6 +35,7 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,7 @@ public class BatchlogManager implements BatchlogManagerMBean
     private final AtomicBoolean isReplaying = new AtomicBoolean();
 
     private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-    
+
     public void start()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -163,11 +164,16 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         logger.debug("Started replayAllFailedBatches");
 
+        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
         try
         {
             for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
                 if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                    replayBatch(row.getUUID("id"));
+                    replayBatch(row.getUUID("id"), rateLimiter);
             cleanup();
         }
         finally
@@ -178,7 +184,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    private void replayBatch(UUID id)
+    private void replayBatch(UUID id, RateLimiter rateLimiter)
     {
         logger.debug("Replaying batch {}", id);
 
@@ -188,7 +194,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"));
+            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
         }
         catch (IOException e)
         {
@@ -200,19 +206,19 @@ public class BatchlogManager implements BatchlogManagerMBean
         totalBatchesReplayed.incrementAndGet();
     }
 
-    private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException
+    private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
+            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
+    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
         int ttl = calculateHintTTL(mutation, writtenAt);
         if (ttl <= 0)
@@ -221,9 +227,12 @@ public class BatchlogManager implements BatchlogManagerMBean
         Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
         String ks = mutation.getTable();
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+
         for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                      StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
         {
+            rateLimiter.acquire(mutationSize);
             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                 mutation.apply();
             else if (FailureDetector.instance.isAlive(endpoint))