You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/05 01:08:09 UTC
git commit: Add ability to throttle batchlog replay
Updated Branches:
refs/heads/cassandra-1.2 7171b7a2c -> 2a7c20ea9
Add ability to throttle batchlog replay
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6550
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a7c20ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a7c20ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a7c20ea
Branch: refs/heads/cassandra-1.2
Commit: 2a7c20ea9111c05964400fe0a30e7b75ff719277
Parents: 7171b7a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Jan 5 03:06:47 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jan 5 03:06:47 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 9 ++++++++
conf/cassandra.yaml | 4 ++++
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 5 +++++
.../apache/cassandra/db/BatchlogManager.java | 23 ++++++++++++++------
6 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64146c1..5a85977 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
* Validate SliceRange start and finish lengths (CASSANDRA-6521)
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
+ * Add ability to throttle batchlog replay (CASSANDRA-6550)
1.2.13
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6293448..214fd05 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,6 +14,15 @@ restore snapshots created with the previous major version using the
using the provided 'sstableupgrade' tool.
+1.2.14
+======
+
+Features
+--------
+ - Batchlog replay can be, and is throttled by default now.
+ See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+
+
1.2.13
======
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 712f134..d038cde 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -53,6 +53,10 @@ hinted_handoff_throttle_in_kb: 1024
# cross-dc handoff tends to be slower
max_hints_delivery_threads: 2
+# Maximum throttle in KBs per second, total. This will be
+# reduced proportionally to the number of nodes in the cluster.
+batchlog_replay_throttle_in_kb: 1024
+
# The following setting populates the page cache on memtable flush and compaction
# WARNING: Enable this setting only when the whole node's data fits in memory.
# Defaults to: false
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 292161b..1c19a85 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -151,6 +151,7 @@ public class Config
public Double reduce_cache_sizes_at = 1.0;
public double reduce_cache_capacity_to = 0.6;
public int hinted_handoff_throttle_in_kb = 1024;
+ public int batchlog_replay_throttle_in_kb = 1024;
public int max_hints_delivery_threads = 1;
public boolean compaction_preheat_key_cache = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0db2f85..3ed82f5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1231,6 +1231,11 @@ public class DatabaseDescriptor
return conf.hinted_handoff_throttle_in_kb;
}
+ public static int getBatchlogReplayThrottleInKB()
+ {
+ return conf.batchlog_replay_throttle_in_kb;
+ }
+
public static int getMaxHintsThread()
{
return conf.max_hints_delivery_threads;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 5fd55a3..1af4909 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,6 +35,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ public class BatchlogManager implements BatchlogManagerMBean
private final AtomicBoolean isReplaying = new AtomicBoolean();
private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-
+
public void start()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -163,11 +164,16 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.debug("Started replayAllFailedBatches");
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+ int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+ RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
try
{
for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
- replayBatch(row.getUUID("id"));
+ replayBatch(row.getUUID("id"), rateLimiter);
cleanup();
}
finally
@@ -178,7 +184,7 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.debug("Finished replayAllFailedBatches");
}
- private void replayBatch(UUID id)
+ private void replayBatch(UUID id, RateLimiter rateLimiter)
{
logger.debug("Replaying batch {}", id);
@@ -188,7 +194,7 @@ public class BatchlogManager implements BatchlogManagerMBean
try
{
- replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"));
+ replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
}
catch (IOException e)
{
@@ -200,19 +206,19 @@ public class BatchlogManager implements BatchlogManagerMBean
totalBatchesReplayed.incrementAndGet();
}
- private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException
+ private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
for (int i = 0; i < size; i++)
- replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
+ replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
}
/*
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*/
- private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
+ private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
{
int ttl = calculateHintTTL(mutation, writtenAt);
if (ttl <= 0)
@@ -221,9 +227,12 @@ public class BatchlogManager implements BatchlogManagerMBean
Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
String ks = mutation.getTable();
Token tk = StorageService.getPartitioner().getToken(mutation.key());
+ int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+
for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
{
+ rateLimiter.acquire(mutationSize);
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
mutation.apply();
else if (FailureDetector.instance.isAlive(endpoint))