You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/18 00:38:18 UTC
git commit: ix batchlog to account for CF truncation records
Repository: cassandra
Updated Branches:
refs/heads/cassandra-1.2 fe94e90f4 -> f46c6578c
ix batchlog to account for CF truncation records
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6999
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f46c6578
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f46c6578
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f46c6578
Branch: refs/heads/cassandra-1.2
Commit: f46c6578c2fb905cd88681d80218d89798032e03
Parents: fe94e90
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 18 01:36:08 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 18 01:36:08 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 102 ++++++++++++-------
.../apache/cassandra/db/ColumnFamilyStore.java | 6 --
.../cassandra/db/HintedHandOffManager.java | 16 +--
.../org/apache/cassandra/db/RowMutation.java | 6 +-
.../org/apache/cassandra/db/SystemTable.java | 53 +++++++---
.../db/commitlog/CommitLogReplayer.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 9 +-
.../cassandra/db/BatchlogManagerTest.java | 78 ++++++++++++--
.../apache/cassandra/db/HintedHandOffTest.java | 2 +-
10 files changed, 189 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 07c09cf..bb08a37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Schedule schema pulls on change (CASSANDRA-6971)
* Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980)
* Shutdown batchlog executor in SS#drain() (CASSANDRA-7025)
+ * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
1.2.16
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index b8dbadd..ea32e9d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -24,10 +24,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
@@ -36,6 +33,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,45 +252,72 @@ public class BatchlogManager implements BatchlogManagerMBean
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
+ List<RowMutation> mutations = new ArrayList<RowMutation>(size);
+
for (int i = 0; i < size; i++)
- replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
+ {
+ RowMutation mutation = RowMutation.serializer.deserialize(in, VERSION);
+
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
+ // truncated.
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemTable.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
+
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+
+ if (!mutations.isEmpty())
+ replayMutations(mutations, writtenAt, rateLimiter);
}
/*
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*/
- private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
+ private void replayMutations(List<RowMutation> mutations, long writtenAt, RateLimiter rateLimiter) throws IOException
{
- int ttl = calculateHintTTL(mutation, writtenAt);
+ int ttl = calculateHintTTL(mutations, writtenAt);
if (ttl <= 0)
- return; // the mutation isn't safe to replay.
-
- Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
- String ks = mutation.getTable();
- Token tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+ return; // this batchlog entry has 'expired'
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ for (RowMutation mutation : mutations)
{
- rateLimiter.acquire(mutationSize);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- mutation.apply();
- else if (FailureDetector.instance.isAlive(endpoint))
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
- else
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
+ Set<InetAddress> liveEndpoints = Sets.newHashSet();
+ List<InetAddress> hintEndpoints = Lists.newArrayList();
- if (!liveEndpoints.isEmpty())
- attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
+ String ks = mutation.getTable();
+ Token tk = StorageService.getPartitioner().getToken(mutation.key());
+ int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ {
+ rateLimiter.acquire(mutationSize);
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ mutation.apply();
+ else if (FailureDetector.instance.isAlive(endpoint))
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ else
+ hintEndpoints.add(endpoint);
+ }
+
+ if (!liveEndpoints.isEmpty())
+ hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+
+ for (InetAddress endpoint : hintEndpoints)
+ StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+ }
}
- private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException
+ // Returns the endpoints we failed to deliver to.
+ private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, Set<InetAddress> endpoints) throws IOException
{
- List<WriteResponseHandler> handlers = Lists.newArrayList();
- final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+ final List<WriteResponseHandler> handlers = Lists.newArrayList();
+ final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
+
for (final InetAddress ep : endpoints)
{
Runnable callback = new Runnable()
@@ -320,20 +345,19 @@ public class BatchlogManager implements BatchlogManagerMBean
}
}
- if (!undelivered.isEmpty())
- {
- int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
- if (ttl > 0)
- for (InetAddress endpoint : undelivered)
- StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
- }
+ return undelivered;
}
- // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
- // this ensures that deletes aren't "undone" by an old batch replay.
- private int calculateHintTTL(RowMutation mutation, long writtenAt)
+ /*
+ * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+ * This ensures that deletes aren't "undone" by an old batch replay.
+ */
+ private int calculateHintTTL(List<RowMutation> mutations, long writtenAt)
{
- return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
+ int unadjustedTTL = Integer.MAX_VALUE;
+ for (RowMutation mutation : mutations)
+ unadjustedTTL = Math.min(unadjustedTTL, mutation.calculateHintTTL());
+ return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
}
private static ByteBuffer columnName(String name)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9e6987d..1100fb9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2096,10 +2096,4 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
return getDataTracker().getDroppableTombstoneRatio();
}
-
- public long getTruncationTime()
- {
- Pair<ReplayPosition, Long> truncationRecord = SystemTable.getTruncationRecords().get(metadata.cfId);
- return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index b1ccbc3..427bbf2 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -30,7 +30,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
@@ -377,20 +376,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
throw new AssertionError(e);
}
- Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
- for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+ for (UUID cfId : rm.getColumnFamilyIds())
{
- Long truncatedAt = truncationTimesCache.get(cfId);
- if (truncatedAt == null)
+ if (hint.maxTimestamp() <= SystemTable.getTruncatedAt(cfId))
{
- ColumnFamilyStore cfs = Table.open(rm.getTable()).getColumnFamilyStore(cfId);
- truncatedAt = cfs.getTruncationTime();
- truncationTimesCache.put(cfId, truncatedAt);
- }
-
- if (hint.maxTimestamp() < truncatedAt)
- {
- logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId);
+ logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
rm = rm.without(cfId);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 1a095c5..cdfc0ae 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -103,8 +103,10 @@ public class RowMutation implements IMutation
/**
* Returns mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available. See HintedHandoffManager for more details.
+ *
+ * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
*/
- public RowMutation toHint(int ttl, UUID targetId) throws IOException
+ public RowMutation toHint(long now, int ttl, UUID targetId) throws IOException
{
assert ttl > 0;
@@ -116,7 +118,7 @@ public class RowMutation implements IMutation
HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
rm.add(path,
ByteBuffer.wrap(FBUtilities.serialize(this, serializer, MessagingService.current_version)),
- System.currentTimeMillis(),
+ now,
ttl);
return rm;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index fbd765f..c691e9f 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -80,6 +80,8 @@ public class SystemTable
private static final String LOCAL_KEY = "local";
private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
+ private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
+
public enum BootstrapState
{
NEEDS_BOOTSTRAP,
@@ -237,20 +239,22 @@ public class SystemTable
}
}
- public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+ public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+ truncationRecords = null;
forceBlockingFlush(LOCAL_CF);
}
/**
* This method is used to remove information about truncation time for specified column family
*/
- public static void removeTruncationRecord(UUID cfId)
+ public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+ truncationRecords = null;
forceBlockingFlush(LOCAL_CF);
}
@@ -271,22 +275,41 @@ public class SystemTable
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
}
- public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
+ public static ReplayPosition getTruncatedPosition(UUID cfId)
{
- String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
- UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
- if (rows.isEmpty())
- return Collections.emptyMap();
+ Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+ return record == null ? null : record.left;
+ }
- UntypedResultSet.Row row = rows.one();
- Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
- if (rawMap == null)
- return Collections.emptyMap();
+ public static long getTruncatedAt(UUID cfId)
+ {
+ Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+ return record == null ? Long.MIN_VALUE : record.right;
+ }
+
+ private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
+ {
+ if (truncationRecords == null)
+ truncationRecords = readTruncationRecords();
+ return truncationRecords.get(cfId);
+ }
+
+ private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
+ {
+ UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'",
+ LOCAL_CF,
+ LOCAL_KEY));
+
+ Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<UUID, Pair<ReplayPosition, Long>>();
+
+ if (!rows.isEmpty() && rows.one().has("truncated_at"))
+ {
+ Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
+ for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
+ records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
+ }
- Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
- for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
- positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
- return positions;
+ return records;
}
private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 934cb6a..46d18b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -68,7 +68,6 @@ public class CommitLogReplayer
// compute per-CF and global replay positions
cfPositions = new HashMap<UUID, ReplayPosition>();
Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
- Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemTable.getTruncationRecords();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
@@ -77,8 +76,7 @@ public class CommitLogReplayer
ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
// but, if we've truncted the cf in question, then we need to need to start replay after the truncation
- Pair<ReplayPosition, Long> truncateRecord = truncationPositions.get(cfs.metadata.cfId);
- ReplayPosition truncatedAt = truncateRecord == null ? null : truncateRecord.left;
+ ReplayPosition truncatedAt = SystemTable.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ca82a1f..7ef3d72 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -560,7 +560,7 @@ public class StorageProxy implements StorageProxyMBean
if (ttl > 0)
{
logger.debug("Adding hint for {}", target);
- writeHintForMutation(mutation, ttl, target);
+ writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
// Notify the handler only for CL == ANY
if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
responseHandler.response(null);
@@ -582,7 +582,10 @@ public class StorageProxy implements StorageProxyMBean
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target) throws IOException
+ /**
+ * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
+ */
+ public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target) throws IOException
{
assert ttl > 0;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
@@ -592,7 +595,7 @@ public class StorageProxy implements StorageProxyMBean
return;
}
assert hostId != null : "Missing host ID for " + target.getHostAddress();
- mutation.toHint(ttl, hostId).apply();
+ mutation.toHint(now, ttl, hostId).apply();
StorageMetrics.totalHints.inc();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 1d89f4b..fd2812f 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -19,7 +19,10 @@ package org.apache.cassandra.db;
import java.net.InetAddress;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
@@ -28,6 +31,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
@@ -52,8 +56,8 @@ public class BatchlogManagerTest extends SchemaLoader
@Test
public void testReplay() throws Exception
{
- assertEquals(0, BatchlogManager.instance.countAllBatches());
- assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ long initialAllBatches = BatchlogManager.instance.countAllBatches();
+ long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
// Generate 1000 mutations and put them all into the batchlog.
// Half (500) ready to be replayed, half not.
@@ -70,15 +74,15 @@ public class BatchlogManagerTest extends SchemaLoader
// Flush the batchlog to disk (see CASSANDRA-6822).
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
- assertEquals(1000, BatchlogManager.instance.countAllBatches());
- assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+ assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
// Force batchlog replay.
BatchlogManager.instance.replayAllFailedBatches();
// Ensure that the first half, and only the first half, got replayed.
- assertEquals(500, BatchlogManager.instance.countAllBatches());
- assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+ assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+ assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
for (int i = 0; i < 1000; i++)
{
@@ -99,4 +103,66 @@ public class BatchlogManagerTest extends SchemaLoader
UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
assertEquals(500, result.one().getLong("count"));
}
+
+ @Test
+ public void testTruncatedReplay() throws InterruptedException, ExecutionException
+ {
+ // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+ // Each batchlog entry with a mutation for Standard2 and Standard3.
+ // In the middle of the process, 'truncate' Standard2.
+ for (int i = 0; i < 1000; i++)
+ {
+ RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i));
+ mutation1.add(new QueryPath("Standard2", null, bytes(i)), bytes(i), 0);
+ RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i));
+ mutation2.add(new QueryPath("Standard3", null, bytes(i)), bytes(i), 0);
+ List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+ // Make sure it's ready to be replayed, so adjust the timestamp.
+ long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+
+ if (i == 500)
+ SystemTable.saveTruncationRecord(Table.open("Keyspace1").getColumnFamilyStore("Standard2"),
+ timestamp,
+ ReplayPosition.NONE);
+
+ // Adjust the timestamp (slightly) to make the test deterministic.
+ if (i >= 500)
+ timestamp++;
+ else
+ timestamp--;
+
+ BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+ }
+
+ // Flush the batchlog to disk (see CASSANDRA-6822).
+ Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
+
+ // Force batchlog replay.
+ BatchlogManager.instance.replayAllFailedBatches();
+
+ // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+ if (i >= 500)
+ {
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ else
+ {
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+ assertEquals(bytes(i), result.one().getBytes("key"));
+ assertEquals(bytes(i), result.one().getBytes("column1"));
+ assertEquals(bytes(i), result.one().getBytes("value"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f46c6578/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index a012109..7b4a736 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -70,7 +70,7 @@ public class HintedHandOffTest extends SchemaLoader
ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
- rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply();
+ rm.toHint(System.currentTimeMillis(), rm.calculateHintTTL(), UUID.randomUUID()).apply();
// flush data to disk
hintStore.forceBlockingFlush();