You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/17 17:41:02 UTC

[1/2] git commit: Paginate batchlog replay

Updated Branches:
  refs/heads/cassandra-2.0 14c6f7030 -> 881462369


Paginate batchlog replay

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0b168f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0b168f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0b168f0

Branch: refs/heads/cassandra-2.0
Commit: b0b168f0690f7e1d2c0e3401ea0e74d3bbccd164
Parents: a3f7035
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 17 19:24:49 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 17 19:24:49 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 78 +++++++++++++++----
 .../cassandra/db/BatchlogManagerTest.java       | 82 ++++++++++++++++++++
 3 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5827d3..f550863 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
  * Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
  * Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+ * Paginate batchlog replay (CASSANDRA-6569)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 1af4909..90dfd47 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
@@ -45,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -66,8 +68,8 @@ public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
     private static final int VERSION = MessagingService.VERSION_12;
-    private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
     private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
+    private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
 
     private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
     public static final BatchlogManager instance = new BatchlogManager();
@@ -124,14 +126,19 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
     {
-        long timestamp = FBUtilities.timestampMicros();
-        ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+        return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+    }
+
+    @VisibleForTesting
+    static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid, long now)
+    {
+        ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
         ByteBuffer data = serializeRowMutations(mutations);
 
         ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
-        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-        cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
-        cf.addColumn(new Column(columnName("data"), data, timestamp));
+        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
+        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+        cf.addColumn(new Column(columnName("data"), data, now));
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.add(cf);
 
@@ -157,7 +164,8 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    private void replayAllFailedBatches() throws ExecutionException, InterruptedException
+    @VisibleForTesting
+    void replayAllFailedBatches() throws ExecutionException, InterruptedException
     {
         if (!isReplaying.compareAndSet(false, true))
             return;
@@ -171,9 +179,25 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
-                if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                    replayBatch(row.getUUID("id"), rateLimiter);
+            UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT %d",
+                                            Table.SYSTEM_KS,
+                                            SystemTable.BATCHLOG_CF,
+                                            PAGE_SIZE);
+
+            while (!page.isEmpty())
+            {
+                UUID id = processBatchlogPage(page, rateLimiter);
+
+                if (page.size() < PAGE_SIZE)
+                    break; // we've exhausted the batchlog, next query would be empty.
+
+                page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
+                               Table.SYSTEM_KS,
+                               SystemTable.BATCHLOG_CF,
+                               id,
+                               PAGE_SIZE);
+            }
+
             cleanup();
         }
         finally
@@ -184,28 +208,48 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    private void replayBatch(UUID id, RateLimiter rateLimiter)
+    // returns the UUID of the last seen batch
+    private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
     {
-        logger.debug("Replaying batch {}", id);
+        UUID id = null;
+        for (UntypedResultSet.Row row : page)
+        {
+            id = row.getUUID("id");
+            long writtenAt = row.getLong("written_at");
+            // enough time for the actual write + batchlog entry mutation delivery (two separate requests).
+            long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+            if (System.currentTimeMillis() < writtenAt + timeout)
+                continue; // not ready to replay yet, might still get a deletion.
+            replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter);
+        }
+        return id;
+    }
 
-        UntypedResultSet result = process("SELECT written_at, data FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
-        if (result.isEmpty())
-            return;
+    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
+    {
+        logger.debug("Replaying batch {}", id);
 
         try
         {
-            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
+            replaySerializedMutations(data, writtenAt, rateLimiter);
         }
         catch (IOException e)
         {
             logger.warn("Skipped batch replay of {} due to {}", id, e);
         }
 
-        process("DELETE FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+        deleteBatch(id);
 
         totalBatchesReplayed.incrementAndGet();
     }
 
+    private void deleteBatch(UUID id)
+    {
+        RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
+        mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
+        mutation.apply();
+    }
+
     private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
new file mode 100644
index 0000000..637815b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -0,0 +1,82 @@
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class BatchlogManagerTest extends SchemaLoader
+{
+    @Before
+    public void setUp() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        InetAddress localhost = InetAddress.getByName("127.0.0.1");
+        metadata.updateNormalToken(Util.token("A"), localhost);
+        metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+    }
+
+    @Test
+    public void testReplay() throws Exception
+    {
+        assertEquals(0, BatchlogManager.instance.countAllBatches());
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        // Generate 1000 mutations and put them all into the batchlog.
+        // Half (500) ready to be replayed, half not.
+        for (int i = 0; i < 1000; i++)
+        {
+            RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
+            mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
+            long timestamp = System.currentTimeMillis();
+            if (i < 500)
+                timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+            BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+        }
+
+        assertEquals(1000, BatchlogManager.instance.countAllBatches());
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        // Force batchlog replay.
+        BatchlogManager.instance.replayAllFailedBatches();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(500, BatchlogManager.instance.countAllBatches());
+        assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+            if (i < 500)
+            {
+                assertEquals(bytes(i), result.one().getBytes("key"));
+                assertEquals(bytes(i), result.one().getBytes("column1"));
+                assertEquals(bytes(i), result.one().getBytes("value"));
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+        assertEquals(500, result.one().getLong("count"));
+    }
+}


[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/db/BatchlogManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88146236
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88146236
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88146236

Branch: refs/heads/cassandra-2.0
Commit: 88146236988eb66c7396897abe66c05dd6a255f9
Parents: 14c6f70 b0b168f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 17 19:40:50 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 17 19:40:50 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 78 ++++++++++++++-----
 .../cassandra/db/BatchlogManagerTest.java       | 82 ++++++++++++++++++++
 3 files changed, 143 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2095be7,f550863..80bc626
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,29 -11,10 +15,30 @@@ Merged from 1.2
   * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
   * Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
   * Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+  * Paginate batchlog replay (CASSANDRA-6569)
  
  
 -1.2.13
 +2.0.4
 + * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
 + * add StorageService.stopDaemon() (CASSANDRA-4268)
 + * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
 + * add client encryption support to sstableloader (CASSANDRA-6378)
 + * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
 + * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
 + * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
 + * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
 + * Fix cleanup ClassCastException (CASSANDRA-6462)
 + * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
 + * Expose a total memtable size metric for a CF (CASSANDRA-6391)
 + * cqlsh: handle symlinks properly (CASSANDRA-6425)
 + * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 + * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
 + * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
 +Merged from 1.2:
   * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
   * Randomize batchlog candidates selection (CASSANDRA-6481)
   * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index cfa049a,90dfd47..23cacca
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -124,16 -126,23 +125,21 @@@ public class BatchlogManager implement
  
      public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
      {
-         long timestamp = FBUtilities.timestampMicros();
-         ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+         return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+     }
+ 
+     @VisibleForTesting
+     static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid, long now)
+     {
+         ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
          ByteBuffer data = serializeRowMutations(mutations);
  
 -        ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-         cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-         cf.addColumn(new Column(columnName("data"), data, timestamp));
-         cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+         cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
 -        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+         cf.addColumn(new Column(columnName("data"), data, now));
 -        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
 -        rm.add(cf);
++        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
  
 -        return rm;
 +        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
      }
  
      private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
@@@ -169,9 -179,25 +176,25 @@@
  
          try
          {
-             for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF))
-                 if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                     replayBatch(row.getUUID("id"), rateLimiter);
+             UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT %d",
 -                                            Table.SYSTEM_KS,
 -                                            SystemTable.BATCHLOG_CF,
++                                            Keyspace.SYSTEM_KS,
++                                            SystemKeyspace.BATCHLOG_CF,
+                                             PAGE_SIZE);
+ 
+             while (!page.isEmpty())
+             {
+                 UUID id = processBatchlogPage(page, rateLimiter);
+ 
+                 if (page.size() < PAGE_SIZE)
+                     break; // we've exhausted the batchlog, next query would be empty.
+ 
+                 page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
 -                               Table.SYSTEM_KS,
 -                               SystemTable.BATCHLOG_CF,
++                               Keyspace.SYSTEM_KS,
++                               SystemKeyspace.BATCHLOG_CF,
+                                id,
+                                PAGE_SIZE);
+             }
+ 
              cleanup();
          }
          finally
@@@ -205,6 -243,13 +240,13 @@@
          totalBatchesReplayed.incrementAndGet();
      }
  
+     private void deleteBatch(UUID id)
+     {
 -        RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
 -        mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
++        RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
++        mutation.delete(SystemKeyspace.BATCHLOG_CF, System.currentTimeMillis());
+         mutation.apply();
+     }
+ 
      private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
      {
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 0000000,637815b..fd1fa81
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -1,0 -1,82 +1,82 @@@
+ package org.apache.cassandra.db;
+ 
+ import java.net.InetAddress;
+ import java.util.Collections;
+ 
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
 -import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public class BatchlogManagerTest extends SchemaLoader
+ {
+     @Before
+     public void setUp() throws Exception
+     {
+         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+         InetAddress localhost = InetAddress.getByName("127.0.0.1");
+         metadata.updateNormalToken(Util.token("A"), localhost);
+         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+     }
+ 
+     @Test
+     public void testReplay() throws Exception
+     {
+         assertEquals(0, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Generate 1000 mutations and put them all into the batchlog.
+         // Half (500) ready to be replayed, half not.
+         for (int i = 0; i < 1000; i++)
+         {
+             RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
 -            mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
++            mutation.add("Standard1", bytes(i), bytes(i), System.currentTimeMillis());
++
+             long timestamp = System.currentTimeMillis();
+             if (i < 500)
+                 timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+             BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+         }
+ 
+         assertEquals(1000, BatchlogManager.instance.countAllBatches());
+         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         // Force batchlog replay.
+         BatchlogManager.instance.replayAllFailedBatches();
+ 
+         // Ensure that the first half, and only the first half, got replayed.
+         assertEquals(500, BatchlogManager.instance.countAllBatches());
+         assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+ 
+         for (int i = 0; i < 1000; i++)
+         {
+             UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+             if (i < 500)
+             {
+                 assertEquals(bytes(i), result.one().getBytes("key"));
+                 assertEquals(bytes(i), result.one().getBytes("column1"));
+                 assertEquals(bytes(i), result.one().getBytes("value"));
+             }
+             else
+             {
+                 assertTrue(result.isEmpty());
+             }
+         }
+ 
+         // Ensure that no stray mutations got somehow applied.
+         UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+         assertEquals(500, result.one().getLong("count"));
+     }
+ }