You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/17 17:41:02 UTC
[1/2] git commit: Paginate batchlog replay
Updated Branches:
refs/heads/cassandra-2.0 14c6f7030 -> 881462369
Paginate batchlog replay
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0b168f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0b168f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0b168f0
Branch: refs/heads/cassandra-2.0
Commit: b0b168f0690f7e1d2c0e3401ea0e74d3bbccd164
Parents: a3f7035
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 17 19:24:49 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 17 19:24:49 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 78 +++++++++++++++----
.../cassandra/db/BatchlogManagerTest.java | 82 ++++++++++++++++++++
3 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5827d3..f550863 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
* Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
* Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
* Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+ * Paginate batchlog replay (CASSANDRA-6569)
1.2.13
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 1af4909..90dfd47 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
@@ -45,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
@@ -66,8 +68,8 @@ public class BatchlogManager implements BatchlogManagerMBean
{
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
private static final int VERSION = MessagingService.VERSION_12;
- private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
+ private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
public static final BatchlogManager instance = new BatchlogManager();
@@ -124,14 +126,19 @@ public class BatchlogManager implements BatchlogManagerMBean
public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
{
- long timestamp = FBUtilities.timestampMicros();
- ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+ return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+ }
+
+ @VisibleForTesting
+ static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid, long now)
+ {
+ ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
ByteBuffer data = serializeRowMutations(mutations);
ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
- cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
- cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
- cf.addColumn(new Column(columnName("data"), data, timestamp));
+ cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
+ cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+ cf.addColumn(new Column(columnName("data"), data, now));
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
rm.add(cf);
@@ -157,7 +164,8 @@ public class BatchlogManager implements BatchlogManagerMBean
return ByteBuffer.wrap(bos.toByteArray());
}
- private void replayAllFailedBatches() throws ExecutionException, InterruptedException
+ @VisibleForTesting
+ void replayAllFailedBatches() throws ExecutionException, InterruptedException
{
if (!isReplaying.compareAndSet(false, true))
return;
@@ -171,9 +179,25 @@ public class BatchlogManager implements BatchlogManagerMBean
try
{
- for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
- if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
- replayBatch(row.getUUID("id"), rateLimiter);
+ UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT %d",
+ Table.SYSTEM_KS,
+ SystemTable.BATCHLOG_CF,
+ PAGE_SIZE);
+
+ while (!page.isEmpty())
+ {
+ UUID id = processBatchlogPage(page, rateLimiter);
+
+ if (page.size() < PAGE_SIZE)
+ break; // we've exhausted the batchlog, next query would be empty.
+
+ page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
+ Table.SYSTEM_KS,
+ SystemTable.BATCHLOG_CF,
+ id,
+ PAGE_SIZE);
+ }
+
cleanup();
}
finally
@@ -184,28 +208,48 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.debug("Finished replayAllFailedBatches");
}
- private void replayBatch(UUID id, RateLimiter rateLimiter)
+ // returns the UUID of the last seen batch
+ private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
{
- logger.debug("Replaying batch {}", id);
+ UUID id = null;
+ for (UntypedResultSet.Row row : page)
+ {
+ id = row.getUUID("id");
+ long writtenAt = row.getLong("written_at");
+ // enough time for the actual write + batchlog entry mutation delivery (two separate requests).
+ long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+ if (System.currentTimeMillis() < writtenAt + timeout)
+ continue; // not ready to replay yet, might still get a deletion.
+ replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter);
+ }
+ return id;
+ }
- UntypedResultSet result = process("SELECT written_at, data FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
- if (result.isEmpty())
- return;
+ private void replayBatch(UUID id, ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
+ {
+ logger.debug("Replaying batch {}", id);
try
{
- replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"), rateLimiter);
+ replaySerializedMutations(data, writtenAt, rateLimiter);
}
catch (IOException e)
{
logger.warn("Skipped batch replay of {} due to {}", id, e);
}
- process("DELETE FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+ deleteBatch(id);
totalBatchesReplayed.incrementAndGet();
}
+ private void deleteBatch(UUID id)
+ {
+ RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
+ mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
+ mutation.apply();
+ }
+
private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
new file mode 100644
index 0000000..637815b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -0,0 +1,82 @@
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class BatchlogManagerTest extends SchemaLoader
+{
+ @Before
+ public void setUp() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ InetAddress localhost = InetAddress.getByName("127.0.0.1");
+ metadata.updateNormalToken(Util.token("A"), localhost);
+ metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+ }
+
+ @Test
+ public void testReplay() throws Exception
+ {
+ assertEquals(0, BatchlogManager.instance.countAllBatches());
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ // Generate 1000 mutations and put them all into the batchlog.
+ // Half (500) ready to be replayed, half not.
+ for (int i = 0; i < 1000; i++)
+ {
+ RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
+ mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
+ long timestamp = System.currentTimeMillis();
+ if (i < 500)
+ timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+ BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+ }
+
+ assertEquals(1000, BatchlogManager.instance.countAllBatches());
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ // Force batchlog replay.
+ BatchlogManager.instance.replayAllFailedBatches();
+
+ // Ensure that the first half, and only the first half, got replayed.
+ assertEquals(500, BatchlogManager.instance.countAllBatches());
+ assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+ if (i < 500)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ // Ensure that no stray mutations got somehow applied.
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+ assertEquals(500, result.one().getLong("count"));
+ }
+}
[2/2] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/db/BatchlogManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88146236
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88146236
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88146236
Branch: refs/heads/cassandra-2.0
Commit: 88146236988eb66c7396897abe66c05dd6a255f9
Parents: 14c6f70 b0b168f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 17 19:40:50 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 17 19:40:50 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 78 ++++++++++++++-----
.../cassandra/db/BatchlogManagerTest.java | 82 ++++++++++++++++++++
3 files changed, 143 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2095be7,f550863..80bc626
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,29 -11,10 +15,30 @@@ Merged from 1.2
* Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
* Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
* Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+ * Paginate batchlog replay (CASSANDRA-6569)
-1.2.13
+2.0.4
+ * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
+ * add StorageService.stopDaemon() (CASSANDRA-4268)
+ * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
+ * add client encryption support to sstableloader (CASSANDRA-6378)
+ * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
+ * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
+ * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
+ * Fix cleanup ClassCastException (CASSANDRA-6462)
+ * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
+ * Fix divide-by-zero in PCI (CASSANDRA-6403)
+ * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
+ * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
+ * Expose a total memtable size metric for a CF (CASSANDRA-6391)
+ * cqlsh: handle symlinks properly (CASSANDRA-6425)
+ * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
+ * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
+ * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
+Merged from 1.2:
* Improved error message on bad properties in DDL queries (CASSANDRA-6453)
* Randomize batchlog candidates selection (CASSANDRA-6481)
* Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index cfa049a,90dfd47..23cacca
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -124,16 -126,23 +125,21 @@@ public class BatchlogManager implement
public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
{
- long timestamp = FBUtilities.timestampMicros();
- ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+ return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+ }
+
+ @VisibleForTesting
+ static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid, long now)
+ {
+ ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
ByteBuffer data = serializeRowMutations(mutations);
- ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
- cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
- cf.addColumn(new Column(columnName("data"), data, timestamp));
- cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+ cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
- cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+ cf.addColumn(new Column(columnName("data"), data, now));
- RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
- rm.add(cf);
++ cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
- return rm;
+ return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
}
private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
@@@ -169,9 -179,25 +176,25 @@@
try
{
- for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF))
- if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
- replayBatch(row.getUUID("id"), rateLimiter);
+ UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT %d",
- Table.SYSTEM_KS,
- SystemTable.BATCHLOG_CF,
++ Keyspace.SYSTEM_KS,
++ SystemKeyspace.BATCHLOG_CF,
+ PAGE_SIZE);
+
+ while (!page.isEmpty())
+ {
+ UUID id = processBatchlogPage(page, rateLimiter);
+
+ if (page.size() < PAGE_SIZE)
+ break; // we've exhausted the batchlog, next query would be empty.
+
+ page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) > token(%s) LIMIT %d",
- Table.SYSTEM_KS,
- SystemTable.BATCHLOG_CF,
++ Keyspace.SYSTEM_KS,
++ SystemKeyspace.BATCHLOG_CF,
+ id,
+ PAGE_SIZE);
+ }
+
cleanup();
}
finally
@@@ -205,6 -243,13 +240,13 @@@
totalBatchesReplayed.incrementAndGet();
}
+ private void deleteBatch(UUID id)
+ {
- RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
- mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
++ RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
++ mutation.delete(SystemKeyspace.BATCHLOG_CF, System.currentTimeMillis());
+ mutation.apply();
+ }
+
private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88146236/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 0000000,637815b..fd1fa81
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -1,0 -1,82 +1,82 @@@
+ package org.apache.cassandra.db;
+
+ import java.net.InetAddress;
+ import java.util.Collections;
+
+ import org.junit.Before;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+ public class BatchlogManagerTest extends SchemaLoader
+ {
+ @Before
+ public void setUp() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ InetAddress localhost = InetAddress.getByName("127.0.0.1");
+ metadata.updateNormalToken(Util.token("A"), localhost);
+ metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+ }
+
+ @Test
+ public void testReplay() throws Exception
+ {
+ assertEquals(0, BatchlogManager.instance.countAllBatches());
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ // Generate 1000 mutations and put them all into the batchlog.
+ // Half (500) ready to be replayed, half not.
+ for (int i = 0; i < 1000; i++)
+ {
+ RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
- mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
++ mutation.add("Standard1", bytes(i), bytes(i), System.currentTimeMillis());
++
+ long timestamp = System.currentTimeMillis();
+ if (i < 500)
+ timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+ BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+ }
+
+ assertEquals(1000, BatchlogManager.instance.countAllBatches());
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ // Force batchlog replay.
+ BatchlogManager.instance.replayAllFailedBatches();
+
+ // Ensure that the first half, and only the first half, got replayed.
+ assertEquals(500, BatchlogManager.instance.countAllBatches());
+ assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+ if (i < 500)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ // Ensure that no stray mutations got somehow applied.
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
+ assertEquals(500, result.one().getLong("count"));
+ }
+ }