You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/05 01:26:32 UTC

[1/3] git commit: Add ability to throttle batchlog replay

Updated Branches:
  refs/heads/trunk 25777e1f4 -> 59c996212


Add ability to throttle batchlog replay

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6550


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a7c20ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a7c20ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a7c20ea

Branch: refs/heads/trunk
Commit: 2a7c20ea9111c05964400fe0a30e7b75ff719277
Parents: 7171b7a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Jan 5 03:06:47 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jan 5 03:06:47 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  9 ++++++++
 conf/cassandra.yaml                             |  4 ++++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 +++++
 .../apache/cassandra/db/BatchlogManager.java    | 23 ++++++++++++++------
 6 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64146c1..5a85977 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Validate SliceRange start and finish lengths (CASSANDRA-6521)
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
+ * Add ability to throttle batchlog replay (CASSANDRA-6550)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6293448..214fd05 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,6 +14,15 @@ restore snapshots created with the previous major version using the
 using the provided 'sstableupgrade' tool.
 
 
+1.2.14
+======
+
+Features
+--------
+    - Batchlog replay can be, and is throttled by default now.
+      See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+
+
 1.2.13
 ======
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 712f134..d038cde 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -53,6 +53,10 @@ hinted_handoff_throttle_in_kb: 1024
 # cross-dc handoff tends to be slower
 max_hints_delivery_threads: 2
 
+# Maximum throttle in KBs per second, total. This will be
+# reduced proportionally to the number of nodes in the cluster.
+batchlog_replay_throttle_in_kb: 1024
+
 # The following setting populates the page cache on memtable flush and compaction
 # WARNING: Enable this setting only when the whole node's data fits in memory.
 # Defaults to: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 292161b..1c19a85 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -151,6 +151,7 @@ public class Config
     public Double reduce_cache_sizes_at = 1.0;
     public double reduce_cache_capacity_to = 0.6;
     public int hinted_handoff_throttle_in_kb = 1024;
+    public int batchlog_replay_throttle_in_kb = 1024;
     public int max_hints_delivery_threads = 1;
     public boolean compaction_preheat_key_cache = true;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0db2f85..3ed82f5 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1231,6 +1231,11 @@ public class DatabaseDescriptor
         return conf.hinted_handoff_throttle_in_kb;
     }
 
+    public static int getBatchlogReplayThrottleInKB()
+    {
+        return conf.batchlog_replay_throttle_in_kb;
+    }
+
     public static int getMaxHintsThread()
     {
         return conf.max_hints_delivery_threads;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a7c20ea/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 5fd55a3..1af4909 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,6 +35,7 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,7 @@ public class BatchlogManager implements BatchlogManagerMBean
     private final AtomicBoolean isReplaying = new AtomicBoolean();
 
     private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-    
+
     public void start()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -163,11 +164,16 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         logger.debug("Started replayAllFailedBatches");
 
+        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
         try
         {
             for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
                 if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                    replayBatch(row.getUUID("id"));
+                    replayBatch(row.getUUID("id"), rateLimiter);
             cleanup();
         }
         finally
@@ -178,7 +184,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    private void replayBatch(UUID id)
+    private void replayBatch(UUID id, RateLimiter rateLimiter)
     {
         logger.debug("Replaying batch {}", id);
 
@@ -188,7 +194,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"));
+            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
         }
         catch (IOException e)
         {
@@ -200,19 +206,19 @@ public class BatchlogManager implements BatchlogManagerMBean
         totalBatchesReplayed.incrementAndGet();
     }
 
-    private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException
+    private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
+            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
+    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
         int ttl = calculateHintTTL(mutation, writtenAt);
         if (ttl <= 0)
@@ -221,9 +227,12 @@ public class BatchlogManager implements BatchlogManagerMBean
         Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
         String ks = mutation.getTable();
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+
         for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                      StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
         {
+            rateLimiter.acquire(mutationSize);
             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                 mutation.apply();
             else if (FailureDetector.instance.isAlive(endpoint))


[2/3] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	NEWS.txt
	src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95f1b5f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95f1b5f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95f1b5f2

Branch: refs/heads/trunk
Commit: 95f1b5f29822fb2893dc7a100fb026729030a70e
Parents: be9a70e 2a7c20e
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Jan 5 03:18:49 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jan 5 03:18:49 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        | 10 +++++++
 conf/cassandra.yaml                             |  4 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 ++++
 .../apache/cassandra/db/BatchlogManager.java    | 29 +++++++++++++-------
 6 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7946927,5a85977..df564dd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,16 +1,32 @@@
 -1.2.14
 - * Allow executing CREATE statements multiple times (CASSANDRA-6471)
 - * Don't send confusing info with timeouts (CASSANDRA-6491)
 - * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
 - * Don't drop local mutations without a trace (CASSANDRA-6510)
 - * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
 - * Validate SliceRange start and finish lengths (CASSANDRA-6521)
 +2.0.5
 +* Delete unfinished compaction incrementally (CASSANDRA-6086)
 +Merged from 1.2:
   * fsync compression metadata (CASSANDRA-6531)
   * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
+  * Add ability to throttle batchlog replay (CASSANDRA-6550)
  
  
 -1.2.13
 +2.0.4
 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
 + * add StorageService.stopDaemon() (CASSANDRA-4268)
 + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
 + * add client encryption support to sstableloader (CASSANDRA-6378)
 + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
 + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
 + * Fix cleanup ClassCastException (CASSANDRA-6462)
 + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
 + * Expose a total memtable size metric for a CF (CASSANDRA-6391)
 + * cqlsh: handle symlinks properly (CASSANDRA-6425)
 + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
 + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
 +Merged from 1.2:
   * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
   * Randomize batchlog candidates selection (CASSANDRA-6481)
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index fdc29ad,214fd05..2e40e9c
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,13 -13,27 +13,23 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
+ 
 -1.2.14
 -======
++2.0.5
++=====
+ 
 -Features
++New features
+ --------
+     - Batchlog replay can be, and is throttled by default now.
+       See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+ 
+ 
 -1.2.13
 -======
 -
 -Upgrading
 ----------
 -    - Nothing specific to this release, but please see 1.2.12 if you are upgrading
 -      from a previous version.
 +2.0.3
 +=====
  
 -
 -1.2.12
 -======
 +New features
 +------------
 +    - It's now possible to configure the maximum allowed size of the native
 +      protocol frames (native_transport_max_frame_size_in_mb in the yaml file).
  
  Upgrading
  ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 3a7407e,1c19a85..a4e4e92
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -146,10 -145,13 +146,11 @@@ public class Confi
  
      public InternodeCompression internode_compression = InternodeCompression.none;
  
 -    public Integer index_interval = 128;
 +    @Deprecated
 +    public Integer index_interval = null;
  
 -    public Double flush_largest_memtables_at = 1.0;
 -    public Double reduce_cache_sizes_at = 1.0;
 -    public double reduce_cache_capacity_to = 0.6;
      public int hinted_handoff_throttle_in_kb = 1024;
+     public int batchlog_replay_throttle_in_kb = 1024;
      public int max_hints_delivery_threads = 1;
      public boolean compaction_preheat_key_cache = true;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/95f1b5f2/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 0968df2,1af4909..cfa049a
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -161,11 -164,16 +162,16 @@@ public class BatchlogManager implement
  
          logger.debug("Started replayAllFailedBatches");
  
+         // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+         // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+         int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+ 
          try
          {
 -            for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
 +            for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF))
                  if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                     replayBatch(row.getUUID("id"));
+                     replayBatch(row.getUUID("id"), rateLimiter);
              cleanup();
          }
          finally
@@@ -186,8 -194,7 +192,8 @@@
  
          try
          {
 -            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
 +            UntypedResultSet.Row batch = result.one();
-             replaySerializedMutations(batch.getBytes("data"), batch.getLong("written_at"));
++            replaySerializedMutations(batch.getBytes("data"), batch.getLong("written_at"), rateLimiter);
          }
          catch (IOException e)
          {
@@@ -211,15 -218,17 +217,17 @@@
       * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
       * when a replica is down or a write request times out.
       */
-     private void replaySerializedMutation(RowMutation mutation, long writtenAt)
 -    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
++    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter)
      {
          int ttl = calculateHintTTL(mutation, writtenAt);
          if (ttl <= 0)
              return; // the mutation isn't safe to replay.
  
--        Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
 -        String ks = mutation.getTable();
 -        Token tk = StorageService.getPartitioner().getToken(mutation.key());
++        Set<InetAddress> liveEndpoints = new HashSet<>();
 +        String ks = mutation.getKeyspaceName();
 +        Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
+         int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+ 
          for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                       StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
          {
@@@ -235,10 -245,10 +244,10 @@@
              attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
      }
  
 -    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException
 +    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints)
      {
          List<WriteResponseHandler> handlers = Lists.newArrayList();
--        final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
++        final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints);
          for (final InetAddress ep : endpoints)
          {
              Runnable callback = new Runnable()
@@@ -290,9 -300,9 +299,9 @@@
      // force flush + compaction to reclaim space from the replayed batches
      private void cleanup() throws ExecutionException, InterruptedException
      {
 -        ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
 +        ColumnFamilyStore cfs = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF);
          cfs.forceBlockingFlush();
--        Collection<Descriptor> descriptors = new ArrayList<Descriptor>();
++        Collection<Descriptor> descriptors = new ArrayList<>();
          for (SSTableReader sstr : cfs.getSSTables())
              descriptors.add(sstr.descriptor);
          if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.


[3/3] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into trunk

Conflicts:
	NEWS.txt
	src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59c99621
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59c99621
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59c99621

Branch: refs/heads/trunk
Commit: 59c996212c90d1d5ae8f8140a78623dcb5ad6c80
Parents: 25777e1 95f1b5f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Jan 5 03:26:20 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jan 5 03:26:20 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  9 ++++++
 conf/cassandra.yaml                             |  4 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 ++++
 .../apache/cassandra/db/BatchlogManager.java    | 29 +++++++++++++-------
 6 files changed, 39 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index b1ec355,2e40e9c..14276d1
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,23 -13,16 +13,32 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +2.1
 +===
 +
 +Upgrading
 +---------
 +   - Rolling upgrades from anything pre-2.0 is not supported.
 +   - For leveled compaction users, 2.0 must be atleast started before
 +     upgrading to 2.1 due to the fact that the old JSON leveled
 +     manifest is migrated into the sstable metadata files on startup
 +     in 2.0 and this code is gone from 2.1.
 +   - For size-tiered compaction users, Cassandra now defaults to ignoring
 +     the coldest 5% of sstables.  This can be customized with the
 +     cold_reads_to_omit compaction option; 0.0 omits nothing (the old
 +     behavior) and 1.0 omits everything.
 +   - Multithreaded compaction has been removed.
 +
  
+ 2.0.5
+ =====
+ 
+ New features
+ --------
+     - Batchlog replay can be, and is throttled by default now.
+       See batchlog_replay_throttle_in_kb setting in cassandra.yaml.
+ 
+ 
  2.0.3
  =====
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59c99621/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index b103b69,cfa049a..4ce7f41
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -204,22 -210,24 +210,24 @@@ public class BatchlogManager implement
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
          int size = in.readInt();
          for (int i = 0; i < size; i++)
-             replaySerializedMutation(Mutation.serializer.deserialize(in, VERSION), writtenAt);
 -            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
++            replaySerializedMutation(Mutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
      }
  
      /*
       * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
       * when a replica is down or a write request times out.
       */
-     private void replaySerializedMutation(Mutation mutation, long writtenAt)
 -    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter)
++    private void replaySerializedMutation(Mutation mutation, long writtenAt, RateLimiter rateLimiter)
      {
          int ttl = calculateHintTTL(mutation, writtenAt);
          if (ttl <= 0)
              return; // the mutation isn't safe to replay.
  
-         Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
+         Set<InetAddress> liveEndpoints = new HashSet<>();
          String ks = mutation.getKeyspaceName();
          Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
 -        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
++        int mutationSize = (int) Mutation.serializer.serializedSize(mutation, VERSION);
+ 
          for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                       StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
          {
@@@ -235,10 -244,10 +244,10 @@@
              attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
      }
  
 -    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints)
 +    private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints)
      {
          List<WriteResponseHandler> handlers = Lists.newArrayList();
-         final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+         final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints);
          for (final InetAddress ep : endpoints)
          {
              Runnable callback = new Runnable()