You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/18 02:15:51 UTC

[4/4] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	build.xml
	debian/changelog
	src/java/org/apache/cassandra/db/BatchlogManager.java
	src/java/org/apache/cassandra/db/ColumnFamilyStore.java
	src/java/org/apache/cassandra/db/HintedHandOffManager.java
	src/java/org/apache/cassandra/db/SystemKeyspace.java
	src/java/org/apache/cassandra/service/StorageProxy.java
	test/unit/org/apache/cassandra/db/BatchlogManagerTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66af6fed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66af6fed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66af6fed

Branch: refs/heads/cassandra-2.1
Commit: 66af6fedc02eed630028043f8a6f0d3014f193d5
Parents: de8a479 384de4b
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 18 03:14:47 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 18 03:14:47 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  11 +-
 .../apache/cassandra/db/BatchlogManager.java    | 102 +++++++++++--------
 .../apache/cassandra/db/ColumnFamilyStore.java  |   6 --
 .../cassandra/db/HintedHandOffManager.java      |  19 +---
 .../org/apache/cassandra/db/SystemKeyspace.java |  55 +++++++---
 .../db/commitlog/CommitLogReplayer.java         |  12 +--
 .../apache/cassandra/service/StorageProxy.java  |   9 +-
 .../cassandra/db/BatchlogManagerTest.java       |  84 +++++++++++++--
 .../apache/cassandra/db/HintedHandOffTest.java  |  19 ++--
 10 files changed, 214 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9f34023,ad26f6d..705f1b8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -108,6 -64,6 +108,7 @@@ Merged from 1.2
   * Schedule schema pulls on change (CASSANDRA-6971)
   * Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980)
   * Shutdown batchlog executor in SS#drain() (CASSANDRA-7025)
++ * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
  
  
  2.0.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 9567ef3,05f9392..ac78a73
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,46 -13,16 +13,55 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +2.1
 +===
 +
 +New features
 +------------
 +   - SSTable data directory name is slightly changed. Each directory will
 +     have hex string appended after CF name, e.g.
 +         ks/cf-5be396077b811e3a3ab9dc4b9ac088d/
 +     This hex string part represents unique ColumnFamily ID.
 +     Note that existing directories are used as is, so only newly created
 +     directories after upgrade have new directory name format.
 +   - Saved key cache files also have ColumnFamily ID in their file name.
 +   - It is now possible to do incremental repairs, sstables that have been
 +     repaired are marked with a timestamp and not included in the next
 +     repair session. Use nodetool repair -par -inc to use this feature.
 +     A tool to manually mark/unmark sstables as repaired is available in
 +     tools/bin/sstablerepairedset.
 +
 +Upgrading
 +---------
 +   - Rolling upgrades from anything pre-2.0.7 is not supported. Furthermore
 +     pre-2.0 sstables are not supported. This means that before upgrading
 +     a node on 2.1, this node must be started on 2.0 and
 +     'nodetool upgdradesstables' must be run (and this even in the case
 +     of not-rolling upgrades).
 +   - For size-tiered compaction users, Cassandra now defaults to ignoring
 +     the coldest 5% of sstables.  This can be customized with the
 +     cold_reads_to_omit compaction option; 0.0 omits nothing (the old
 +     behavior) and 1.0 omits everything.
 +   - Multithreaded compaction has been removed.
 +   - Counters implementation has been changed, replaced by a safer one with
 +     less caveats, but different performance characteristics. You might have
 +     to change your data model to accomodate the new implementation.
 +     (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev
 +     blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details).
 +    - (per-table) index_interval parameter has been replaced with
 +     min_index_interval and max_index_interval paratemeters. index_interval
 +     has been deprecated.
 +
  
+ 2.0.7
+ =====
+ 
+ Upgrading
+ ---------
+     - Nothing specific to this release, but please see 2.0.6 if you are upgrading
+       from a previous version.
+ 
+ 
  2.0.6
  =====
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 02c029d,5770994..7f86a6e
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -244,45 -247,72 +240,72 @@@ public class BatchlogManager implement
      {
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
          int size = in.readInt();
 -        List<RowMutation> mutations = new ArrayList<>(size);
++        List<Mutation> mutations = new ArrayList<>(size);
+ 
          for (int i = 0; i < size; i++)
-             replaySerializedMutation(Mutation.serializer.deserialize(in, version), writtenAt, version, rateLimiter);
+         {
 -            RowMutation mutation = RowMutation.serializer.deserialize(in, version);
++            Mutation mutation = Mutation.serializer.deserialize(in, version);
+ 
+             // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+             // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
+             // truncated.
+             for (UUID cfId : mutation.getColumnFamilyIds())
+                 if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+                     mutation = mutation.without(cfId);
+ 
+             if (!mutation.isEmpty())
+                 mutations.add(mutation);
+         }
+ 
+         if (!mutations.isEmpty())
+             replayMutations(mutations, writtenAt, version, rateLimiter);
      }
  
      /*
       * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
       * when a replica is down or a write request times out.
       */
-     private void replaySerializedMutation(Mutation mutation, long writtenAt, int version, RateLimiter rateLimiter)
 -    private void replayMutations(List<RowMutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
++    private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
      {
-         int ttl = calculateHintTTL(mutation, writtenAt);
+         int ttl = calculateHintTTL(mutations, writtenAt);
          if (ttl <= 0)
-             return; // the mutation isn't safe to replay.
- 
-         Set<InetAddress> liveEndpoints = new HashSet<>();
-         String ks = mutation.getKeyspaceName();
-         Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
-         int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+             return; // this batchlog entry has 'expired'
  
-         for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
-                                                      StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
 -        for (RowMutation mutation : mutations)
++        for (Mutation mutation : mutations)
          {
-             rateLimiter.acquire(mutationSize);
-             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                 mutation.apply();
-             else if (FailureDetector.instance.isAlive(endpoint))
-                 liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
-             else
-                 StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
-         }
+             List<InetAddress> liveEndpoints = new ArrayList<>();
+             List<InetAddress> hintEndpoints = new ArrayList<>();
  
-         if (!liveEndpoints.isEmpty())
-             attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
+             String ks = mutation.getKeyspaceName();
+             Token tk = StorageService.getPartitioner().getToken(mutation.key());
 -            int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version);
++            int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ 
+             for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+                                                          StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+             {
+                 rateLimiter.acquire(mutationSize);
+                 if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                     mutation.apply();
+                 else if (FailureDetector.instance.isAlive(endpoint))
+                     liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                 else
+                     hintEndpoints.add(endpoint);
+             }
+ 
+             if (!liveEndpoints.isEmpty())
+                 hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+ 
+             for (InetAddress endpoint : hintEndpoints)
+                 StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+         }
      }
  
-     private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints)
+     // Returns the endpoints we failed to deliver to.
 -    private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, List<InetAddress> endpoints) throws IOException
++    private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException
      {
-         List<WriteResponseHandler> handlers = Lists.newArrayList();
-         final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints);
+         final List<WriteResponseHandler> handlers = new ArrayList<>();
+         final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
+ 
          for (final InetAddress ep : endpoints)
          {
              Runnable callback = new Runnable()
@@@ -310,22 -340,26 +333,21 @@@
              }
          }
  
-         if (!undelivered.isEmpty())
-         {
-             int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
-             if (ttl > 0)
-                 for (InetAddress endpoint : undelivered)
-                     StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
-         }
+         return undelivered;
      }
  
-     // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
-     // this ensures that deletes aren't "undone" by an old batch replay.
-     private int calculateHintTTL(Mutation mutation, long writtenAt)
+     /*
+      * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+      * This ensures that deletes aren't "undone" by an old batch replay.
+      */
 -    private int calculateHintTTL(List<RowMutation> mutations, long writtenAt)
++    private int calculateHintTTL(List<Mutation> mutations, long writtenAt)
      {
-         return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
+         int unadjustedTTL = Integer.MAX_VALUE;
 -        for (RowMutation mutation : mutations)
++        for (Mutation mutation : mutations)
+             unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
+         return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
      }
  
 -    private static ByteBuffer columnName(String name)
 -    {
 -        return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
 -    }
 -
      // force flush + compaction to reclaim space from the replayed batches
      private void cleanup() throws ExecutionException, InterruptedException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 923ea5b,36bc470..8f96765
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2795,17 -2416,6 +2795,11 @@@ public class ColumnFamilyStore implemen
          return getDataTracker().getDroppableTombstoneRatio();
      }
  
-     public long getTruncationTime()
-     {
-         Pair<ReplayPosition, Long> truncationRecord = SystemKeyspace.getTruncationRecords().get(metadata.cfId);
-         return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right;
-     }
- 
 +    public long trueSnapshotsSize()
 +    {
 +        return directories.trueSnapshotsSize();
 +    }
 +
      @VisibleForTesting
      void resetFileIndexGenerator()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 12404da,942707e..e83aefc
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -121,7 -119,7 +120,7 @@@ public class HintedHandOffManager imple
       * Returns a mutation representing a Hint to be sent to <code>targetId</code>
       * as soon as it becomes available again.
       */
-     public Mutation hintFor(Mutation mutation, int ttl, UUID targetId)
 -    public RowMutation hintFor(RowMutation mutation, long now, int ttl, UUID targetId)
++    public Mutation hintFor(Mutation mutation, long now, int ttl, UUID targetId)
      {
          assert ttl > 0;
  
@@@ -134,11 -132,11 +133,11 @@@
  
          UUID hintId = UUIDGen.getTimeUUID();
          // serialize the hint with id and version as a composite column name
 -        ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
 -        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
 +        CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
 +        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
          ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
-         cf.addColumn(name, value, System.currentTimeMillis(), ttl);
+         cf.addColumn(name, value, now, ttl);
 -        return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
 +        return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
      }
  
      /*
@@@ -389,8 -387,7 +388,7 @@@
              }
  
              List<WriteResponseHandler> responseHandlers = Lists.newArrayList();
-             Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>();
 -            for (final Column hint : hintsPage)
 +            for (final Cell hint : hintsPage)
              {
                  // check if hints delivery has been paused during the process
                  if (hintedHandOffPaused)
@@@ -427,21 -425,12 +425,12 @@@
                      throw new AssertionError(e);
                  }
  
-                 truncationTimesCache.clear();
-                 for (UUID cfId : ImmutableSet.copyOf((mutation.getColumnFamilyIds())))
 -                for (UUID cfId : rm.getColumnFamilyIds())
++                for (UUID cfId : mutation.getColumnFamilyIds())
                  {
-                     Long truncatedAt = truncationTimesCache.get(cfId);
-                     if (truncatedAt == null)
-                     {
-                         ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(cfId);
-                         truncatedAt = cfs.getTruncationTime();
-                         truncationTimesCache.put(cfId, truncatedAt);
-                     }
- 
-                     if (hint.timestamp() < truncatedAt)
 -                    if (hint.maxTimestamp() <= SystemKeyspace.getTruncatedAt(cfId))
++                    if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId))
                      {
                          logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
 -                        rm = rm.without(cfId);
 +                        mutation = mutation.without(cfId);
                      }
                  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index a4072d1,fe8f179..b19eb1e
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -41,10 -40,8 +41,10 @@@ import org.apache.cassandra.config.Sche
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.Composites;
  import org.apache.cassandra.db.filter.QueryFilter;
  import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.dht.Range;
@@@ -90,12 -86,8 +90,14 @@@ public class SystemKeyspac
      private static final String LOCAL_KEY = "local";
      private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local");
  
 +    public static final List<String> allSchemaCfs = Arrays.asList(SCHEMA_KEYSPACES_CF,
 +                                                                  SCHEMA_COLUMNFAMILIES_CF,
 +                                                                  SCHEMA_COLUMNS_CF,
 +                                                                  SCHEMA_TRIGGERS_CF,
 +                                                                  SCHEMA_USER_TYPES_CF);
 +
+     private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
+ 
      public enum BootstrapState
      {
          NEEDS_BOOTSTRAP,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index a9d061a,dce7256..1b137ca
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -911,7 -900,10 +911,10 @@@ public class StorageProxy implements St
          return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
      }
  
-     public static void writeHintForMutation(Mutation mutation, int ttl, InetAddress target)
+     /**
+      * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs
+      */
 -    public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target)
++    public static void writeHintForMutation(Mutation mutation, long now, int ttl, InetAddress target)
      {
          assert ttl > 0;
          UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 43f96fb,954c1f2..9982be9
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@@ -28,9 -31,8 +31,10 @@@ import org.apache.cassandra.Util
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.composites.CellNameType;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.apache.cassandra.locator.TokenMetadata;
 +import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.UUIDGen;
  
@@@ -106,4 -103,66 +110,72 @@@ public class BatchlogManagerTest extend
          UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\""));
          assertEquals(500, result.one().getLong("count"));
      }
+ 
+     @Test
+     public void testTruncatedReplay() throws InterruptedException, ExecutionException
+     {
++        CellNameType comparator2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2").metadata.comparator;
++        CellNameType comparator3 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard3").metadata.comparator;
+         // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+         // Each batchlog entry with a mutation for Standard2 and Standard3.
+         // In the middle of the process, 'truncate' Standard2.
+         for (int i = 0; i < 1000; i++)
+         {
 -            RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i));
 -            mutation1.add("Standard2", bytes(i), bytes(i), 0);
 -            RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i));
 -            mutation2.add("Standard3", bytes(i), bytes(i), 0);
 -            List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2);
++            Mutation mutation1 = new Mutation("Keyspace1", bytes(i));
++            mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0);
++            Mutation mutation2 = new Mutation("Keyspace1", bytes(i));
++            mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0);
++            List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
+ 
+             // Make sure it's ready to be replayed, so adjust the timestamp.
+             long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+ 
+             if (i == 500)
+                 SystemKeyspace.saveTruncationRecord(Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"),
+                                                     timestamp,
+                                                     ReplayPosition.NONE);
+ 
+             // Adjust the timestamp (slightly) to make the test deterministic.
+             if (i >= 500)
+                 timestamp++;
+             else
+                 timestamp--;
+ 
 -            BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply();
++            BatchlogManager.getBatchlogMutationFor(mutations,
++                                                   UUIDGen.getTimeUUID(),
++                                                   MessagingService.current_version,
++                                                   timestamp * 1000)
++                           .apply();
+         }
+ 
+         // Flush the batchlog to disk (see CASSANDRA-6822).
+         Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush();
+ 
+         // Force batchlog replay.
+         BatchlogManager.instance.replayAllFailedBatches();
+ 
+         // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+         for (int i = 0; i < 1000; i++)
+         {
+             UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i));
+             if (i >= 500)
+             {
+                 assertEquals(bytes(i), result.one().getBytes("key"));
+                 assertEquals(bytes(i), result.one().getBytes("column1"));
+                 assertEquals(bytes(i), result.one().getBytes("value"));
+             }
+             else
+             {
+                 assertTrue(result.isEmpty());
+             }
+         }
+ 
+         for (int i = 0; i < 1000; i++)
+         {
+             UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i));
+             assertEquals(bytes(i), result.one().getBytes("key"));
+             assertEquals(bytes(i), result.one().getBytes("column1"));
+             assertEquals(bytes(i), result.one().getBytes("value"));
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index c3b9367,9ffd702..622c816
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@@ -28,8 -27,9 +27,10 @@@ import java.util.concurrent.TimeUnit
  
  import org.junit.Test;
  
+ import com.google.common.collect.Iterators;
+ 
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@@ -63,10 -61,14 +62,14 @@@ public class HintedHandOffTest extends 
          hintStore.disableAutoCompaction();
  
          // insert 1 hint
 -        RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
 -        rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
 +        Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1));
 +        rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
  
-         HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
+         HintedHandOffManager.instance.hintFor(rm,
+                                               System.currentTimeMillis(),
+                                               HintedHandOffManager.calculateHintTTL(rm),
+                                               UUID.randomUUID())
+                                      .apply();
  
          // flush data to disk
          hintStore.forceBlockingFlush();
@@@ -102,10 -104,14 +105,14 @@@
          hintStore.clearUnsafe();
  
          // insert 1 hint
 -        RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1));
 -        rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
 +        Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1));
 +        rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
  
-         HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
+         HintedHandOffManager.instance.hintFor(rm,
+                                               System.currentTimeMillis(),
+                                               HintedHandOffManager.calculateHintTTL(rm),
+                                               UUID.randomUUID())
+                                      .apply();
  
          assert getNoOfHints() == 1;