You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/18 00:39:23 UTC

git commit: Fix batchlog to account for CF truncation records

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-1.2 f46c6578c -> 87097066e (forced update)


Fix batchlog to account for CF truncation records

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6999


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87097066
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87097066
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87097066

Branch: refs/heads/cassandra-1.2
Commit: 87097066e7c3c133e333804c4e4b00457b6c989d
Parents: fe94e90
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 18 01:36:08 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 18 01:38:55 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/BatchlogManager.java    | 102 ++++++++++++-------
 .../apache/cassandra/db/ColumnFamilyStore.java  |   6 --
 .../cassandra/db/HintedHandOffManager.java      |  16 +--
 .../org/apache/cassandra/db/RowMutation.java    |   6 +-
 .../org/apache/cassandra/db/SystemTable.java    |  53 +++++++---
 .../db/commitlog/CommitLogReplayer.java         |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   9 +-
 .../cassandra/db/BatchlogManagerTest.java       |  78 ++++++++++++--
 .../apache/cassandra/db/HintedHandOffTest.java  |   2 +-
 10 files changed, 189 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 07c09cf..bb08a37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Schedule schema pulls on change (CASSANDRA-6971)
  * Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980)
  * Shutdown batchlog executor in SS#drain() (CASSANDRA-7025)
+ * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
 
 
 1.2.16

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index b8dbadd..ea32e9d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -24,10 +24,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
@@ -36,6 +33,7 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -254,45 +252,72 @@ public class BatchlogManager implements BatchlogManagerMBean
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
+        List<RowMutation> mutations = new ArrayList<RowMutation>(size);
+
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter);
+        {
+            RowMutation mutation = RowMutation.serializer.deserialize(in, VERSION);
+
+            // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+            // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
+            // truncated.
+            for (UUID cfId : mutation.getColumnFamilyIds())
+                if (writtenAt <= SystemTable.getTruncatedAt(cfId))
+                    mutation = mutation.without(cfId);
+
+            if (!mutation.isEmpty())
+                mutations.add(mutation);
+        }
+
+        if (!mutations.isEmpty())
+            replayMutations(mutations, writtenAt, rateLimiter);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException
+    private void replayMutations(List<RowMutation> mutations, long writtenAt, RateLimiter rateLimiter) throws IOException
     {
-        int ttl = calculateHintTTL(mutation, writtenAt);
+        int ttl = calculateHintTTL(mutations, writtenAt);
         if (ttl <= 0)
-            return; // the mutation isn't safe to replay.
-
-        Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
-        String ks = mutation.getTable();
-        Token tk = StorageService.getPartitioner().getToken(mutation.key());
-        int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+            return; // this batchlog entry has 'expired'
 
-        for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
-                                                     StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+        for (RowMutation mutation : mutations)
         {
-            rateLimiter.acquire(mutationSize);
-            if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                mutation.apply();
-            else if (FailureDetector.instance.isAlive(endpoint))
-                liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
-            else
-                StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
-        }
+            Set<InetAddress> liveEndpoints = Sets.newHashSet();
+            List<InetAddress> hintEndpoints = Lists.newArrayList();
 
-        if (!liveEndpoints.isEmpty())
-            attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
+            String ks = mutation.getTable();
+            Token tk = StorageService.getPartitioner().getToken(mutation.key());
+            int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION);
+
+            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+            {
+                rateLimiter.acquire(mutationSize);
+                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                    mutation.apply();
+                else if (FailureDetector.instance.isAlive(endpoint))
+                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                else
+                    hintEndpoints.add(endpoint);
+            }
+
+            if (!liveEndpoints.isEmpty())
+                hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+
+            for (InetAddress endpoint : hintEndpoints)
+                StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+        }
     }
 
-    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException
+    // Returns the endpoints we failed to deliver to.
+    private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, Set<InetAddress> endpoints) throws IOException
     {
-        List<WriteResponseHandler> handlers = Lists.newArrayList();
-        final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+        final List<WriteResponseHandler> handlers = Lists.newArrayList();
+        final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
+
         for (final InetAddress ep : endpoints)
         {
             Runnable callback = new Runnable()
@@ -320,20 +345,19 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
         }
 
-        if (!undelivered.isEmpty())
-        {
-            int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
-            if (ttl > 0)
-                for (InetAddress endpoint : undelivered)
-                    StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
-        }
+        return undelivered;
     }
 
-    // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
-    // this ensures that deletes aren't "undone" by an old batch replay.
-    private int calculateHintTTL(RowMutation mutation, long writtenAt)
+    /*
+     * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+     * This ensures that deletes aren't "undone" by an old batch replay.
+     */
+    private int calculateHintTTL(List<RowMutation> mutations, long writtenAt)
     {
-        return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
+        int unadjustedTTL = Integer.MAX_VALUE;
+        for (RowMutation mutation : mutations)
+            unadjustedTTL = Math.min(unadjustedTTL, mutation.calculateHintTTL());
+        return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
     }
 
     private static ByteBuffer columnName(String name)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9e6987d..1100fb9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2096,10 +2096,4 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         return getDataTracker().getDroppableTombstoneRatio();
     }
-
-    public long getTruncationTime()
-    {
-        Pair<ReplayPosition, Long> truncationRecord = SystemTable.getTruncationRecords().get(metadata.cfId);
-        return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index b1ccbc3..427bbf2 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -30,7 +30,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
@@ -377,20 +376,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     throw new AssertionError(e);
                 }
 
-                Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
-                for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+                for (UUID cfId : rm.getColumnFamilyIds())
                 {
-                    Long truncatedAt = truncationTimesCache.get(cfId);
-                    if (truncatedAt == null)
+                    if (hint.maxTimestamp() <= SystemTable.getTruncatedAt(cfId))
                     {
-                        ColumnFamilyStore cfs = Table.open(rm.getTable()).getColumnFamilyStore(cfId);
-                        truncatedAt = cfs.getTruncationTime();
-                        truncationTimesCache.put(cfId, truncatedAt);
-                    }
-
-                    if (hint.maxTimestamp() < truncatedAt)
-                    {
-                        logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId);
+                        logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
                         rm = rm.without(cfId);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 1a095c5..cdfc0ae 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -103,8 +103,10 @@ public class RowMutation implements IMutation
     /**
      * Returns mutation representing a Hint to be sent to <code>targetId</code>
      * as soon as it becomes available. See HintedHandoffManager for more details.
+     *
+     * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
      */
-    public RowMutation toHint(int ttl, UUID targetId) throws IOException
+    public RowMutation toHint(long now, int ttl, UUID targetId) throws IOException
     {
         assert ttl > 0;
 
@@ -116,7 +118,7 @@ public class RowMutation implements IMutation
                                        HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
         rm.add(path,
                ByteBuffer.wrap(FBUtilities.serialize(this, serializer, MessagingService.current_version)),
-               System.currentTimeMillis(),
+               now,
                ttl);
 
         return rm;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index fbd765f..c691e9f 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -80,6 +80,8 @@ public class SystemTable
     private static final String LOCAL_KEY = "local";
     private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
 
+    private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
+
     public enum BootstrapState
     {
         NEEDS_BOOTSTRAP,
@@ -237,20 +239,22 @@ public class SystemTable
         }
     }
 
-    public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
+    public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'";
         processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY));
+        truncationRecords = null;
         forceBlockingFlush(LOCAL_CF);
     }
 
     /**
      * This method is used to remove information about truncation time for specified column family
      */
-    public static void removeTruncationRecord(UUID cfId)
+    public static synchronized void removeTruncationRecord(UUID cfId)
     {
         String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'";
         processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY));
+        truncationRecords = null;
         forceBlockingFlush(LOCAL_CF);
     }
 
@@ -271,22 +275,41 @@ public class SystemTable
                              ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength())));
     }
 
-    public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords()
+    public static ReplayPosition getTruncatedPosition(UUID cfId)
     {
-        String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'";
-        UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
-        if (rows.isEmpty())
-            return Collections.emptyMap();
+        Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+        return record == null ? null : record.left;
+    }
 
-        UntypedResultSet.Row row = rows.one();
-        Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance);
-        if (rawMap == null)
-            return Collections.emptyMap();
+    public static long getTruncatedAt(UUID cfId)
+    {
+        Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+        return record == null ? Long.MIN_VALUE : record.right;
+    }
+
+    private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
+    {
+        if (truncationRecords == null)
+            truncationRecords = readTruncationRecords();
+        return truncationRecords.get(cfId);
+    }
+
+    private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
+    {
+        UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'",
+                                                              LOCAL_CF,
+                                                              LOCAL_KEY));
+
+        Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<UUID, Pair<ReplayPosition, Long>>();
+
+        if (!rows.isEmpty() && rows.one().has("truncated_at"))
+        {
+            Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
+            for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
+                records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
+        }
 
-        Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>();
-        for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet())
-            positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
-        return positions;
+        return records;
     }
 
     private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 934cb6a..46d18b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -68,7 +68,6 @@ public class CommitLogReplayer
         // compute per-CF and global replay positions
         cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
-        Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemTable.getTruncationRecords();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
@@ -77,8 +76,7 @@ public class CommitLogReplayer
             ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
 
             // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
-            Pair<ReplayPosition, Long> truncateRecord = truncationPositions.get(cfs.metadata.cfId);
-            ReplayPosition truncatedAt = truncateRecord == null ? null : truncateRecord.left;
+            ReplayPosition truncatedAt = SystemTable.getTruncatedPosition(cfs.metadata.cfId);
             if (truncatedAt != null)
                 rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ca82a1f..7ef3d72 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -560,7 +560,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (ttl > 0)
                 {
                     logger.debug("Adding hint for {}", target);
-                    writeHintForMutation(mutation, ttl, target);
+                    writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
                     // Notify the handler only for CL == ANY
                     if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
                         responseHandler.response(null);
@@ -582,7 +582,10 @@ public class StorageProxy implements StorageProxyMBean
         return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
-    public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target) throws IOException
+    /**
+     * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
+     */
+    public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target) throws IOException
     {
         assert ttl > 0;
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
@@ -592,7 +595,7 @@ public class StorageProxy implements StorageProxyMBean
             return;
         }
         assert hostId != null : "Missing host ID for " + target.getHostAddress();
-        mutation.toHint(ttl, hostId).apply();
+        mutation.toHint(now, ttl, hostId).apply();
         StorageMetrics.totalHints.inc();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 1d89f4b..fd2812f 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -19,7 +19,10 @@ package org.apache.cassandra.db;
 
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.Lists;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -28,6 +31,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
@@ -52,8 +56,8 @@ public class BatchlogManagerTest extends SchemaLoader
     @Test
     public void testReplay() throws Exception
     {
-        assertEquals(0, BatchlogManager.instance.countAllBatches());
-        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
 
         // Generate 1000 mutations and put them all into the batchlog.
         // Half (500) ready to be replayed, half not.
@@ -70,15 +74,15 @@ public class BatchlogManagerTest extends SchemaLoader
         // Flush the batchlog to disk (see CASSANDRA-6822).
         Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
 
-        assertEquals(1000, BatchlogManager.instance.countAllBatches());
-        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+        assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
 
         // Force batchlog replay.
         BatchlogManager.instance.replayAllFailedBatches();
 
         // Ensure that the first half, and only the first half, got replayed.
-        assertEquals(500, BatchlogManager.instance.countAllBatches());
-        assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+        assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
 
         for (int i = 0; i < 1000; i++)
         {
@@ -99,4 +103,66 @@ public class BatchlogManagerTest extends SchemaLoader
         UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
         assertEquals(500, result.one().getLong("count"));
     }
+
+    @Test
+    public void testTruncatedReplay() throws InterruptedException, ExecutionException
+    {
+        // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+        // Each batchlog entry with a mutation for Standard2 and Standard3.
+        // In the middle of the process, 'truncate' Standard2.
+        for (int i = 0; i < 1000; i++)
+        {
+            RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i));
+            mutation1.add(new QueryPath("Standard2", null, bytes(i)), bytes(i), 0);
+            RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i));
+            mutation2.add(new QueryPath("Standard3", null, bytes(i)), bytes(i), 0);
+            List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+            // Make sure it's ready to be replayed, so adjust the timestamp.
+            long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+
+            if (i == 500)
+                SystemTable.saveTruncationRecord(Table.open("Keyspace1").getColumnFamilyStore("Standard2"),
+                                                 timestamp,
+                                                 ReplayPosition.NONE);
+
+            // Adjust the timestamp (slightly) to make the test deterministic.
+            if (i >= 500)
+                timestamp++;
+            else
+                timestamp--;
+
+            BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush();
+
+        // Force batchlog replay.
+        BatchlogManager.instance.replayAllFailedBatches();
+
+        // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+            if (i >= 500)
+            {
+                assertEquals(bytes(i), result.one().getBytes("key"));
+                assertEquals(bytes(i), result.one().getBytes("column1"));
+                assertEquals(bytes(i), result.one().getBytes("value"));
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+            assertEquals(bytes(i), result.one().getBytes("key"));
+            assertEquals(bytes(i), result.one().getBytes("column1"));
+            assertEquals(bytes(i), result.one().getBytes("value"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index a012109..7b4a736 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -70,7 +70,7 @@ public class HintedHandOffTest extends SchemaLoader
                ByteBufferUtil.EMPTY_BYTE_BUFFER,
                System.currentTimeMillis());
 
-        rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply();
+        rm.toHint(System.currentTimeMillis(), rm.calculateHintTTL(), UUID.randomUUID()).apply();
 
         // flush data to disk
         hintStore.forceBlockingFlush();