You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/01 18:49:39 UTC

[1/2] cassandra git commit: Add CASSANDRA-9195 unit tests

Repository: cassandra
Updated Branches:
  refs/heads/trunk c799a98f0 -> 6404e015f


Add CASSANDRA-9195 unit tests

patch by Branimir Lambov


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f43efaa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f43efaa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f43efaa1

Branch: refs/heads/trunk
Commit: f43efaa1bdeec1057a8097fdb5ab62b4c9c19e67
Parents: 33c5913
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri May 1 19:42:50 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri May 1 19:44:03 2015 +0300

----------------------------------------------------------------------
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/RecoveryManagerTruncateTest.java         | 179 +++++++++++++++----
 2 files changed, 142 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f43efaa1/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 602cf94..91f3179 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -56,7 +56,7 @@ public class CommitLogArchiver
     final String archiveCommand;
     final String restoreCommand;
     final String restoreDirectories;
-    public final long restorePointInTime;
+    public long restorePointInTime;
     public final TimeUnit precision;
 
     public CommitLogArchiver()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f43efaa1/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 1f7d388..817b8e9 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -19,15 +19,14 @@
 package org.apache.cassandra.db;
 
 import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
+import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.junit.Test;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -35,46 +34,150 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class RecoveryManagerTruncateTest extends SchemaLoader
 {
-	@Test
-	public void testTruncate() throws IOException
-	{
-		Keyspace keyspace = Keyspace.open("Keyspace1");
-		ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+    @Test
+    public void testTruncate() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-		Mutation rm;
-		ColumnFamily cf;
+        Mutation rm;
+        ColumnFamily cf;
 
-		// add a single cell
+        // add a single cell
         cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-		cf.addColumn(column("col1", "val1", 1L));
+        cf.addColumn(column("col1", "val1", 1L));
         rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
-		rm.apply();
+        rm.apply();
+        long time = System.currentTimeMillis();
 
-		// Make sure data was written
-		assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+        // Make sure data was written
+        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
 
-		// and now truncate it
-		cfs.truncateBlocking();
+        // and now truncate it
+        cfs.truncateBlocking();
         CommitLog.instance.resetUnsafe();
-		CommitLog.instance.recover();
-
-		// and validate truncation.
-		assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
-	}
-
-	private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
-	{
-		ColumnFamily cf;
-		ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
-		if (cfStore == null)
-		{
-			return null;
-		}
-		cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
-		if (cf == null)
-		{
-			return null;
-		}
-		return cf.getColumn(Util.cellname(columnName));
-	}
+        CommitLog.instance.recover();
+
+        // and validate truncation.
+        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+        assertTrue(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+    }
+
+    @Test
+    public void testTruncatePointInTime() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        Mutation rm;
+        ColumnFamily cf;
+
+        // add a single cell
+        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+        cf.addColumn(column("col2", "val1", 1L));
+        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+        rm.apply();
+
+        // Make sure data was written
+        long time = System.currentTimeMillis();
+        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+
+        // and now truncate it
+        cfs.truncateBlocking();
+
+        // verify truncation
+        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+
+        try
+        {
+            // Restore to point in time.
+            CommitLog.instance.archiver.restorePointInTime = time;
+            CommitLog.instance.resetUnsafe();
+            CommitLog.instance.recover();
+        }
+        finally
+        {
+            CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+        }
+
+        // Validate pre-truncation data was restored.
+        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+        // And that we don't have a truncation record after restore time.
+        assertFalse(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+    }
+
+    @Test
+    public void testTruncatePointInTimeReplayList() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore("Standard1");
+        ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("Standard2");
+
+        Mutation rm;
+        ColumnFamily cf;
+
+        // add a single cell
+        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+        cf.addColumn(column("col3", "val1", 1L));
+        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+        rm.apply();
+
+        // add a single cell
+        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard2");
+        cf.addColumn(column("col4", "val1", 1L));
+        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+        rm.apply();
+
+        // Make sure data was written
+        long time = System.currentTimeMillis();
+        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+        assertNotNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+        // and now truncate it
+        cfs1.truncateBlocking();
+        cfs2.truncateBlocking();
+
+        // verify truncation
+        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+        assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+        try
+        {
+            // Restore to point in time.
+            CommitLog.instance.archiver.restorePointInTime = time;
+            System.setProperty("cassandra.replayList", "Keyspace1.Standard1");
+            CommitLog.instance.resetUnsafe();
+            CommitLog.instance.recover();
+        }
+        finally
+        {
+            CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+            System.clearProperty("cassandra.replayList");
+        }
+
+        // Validate pre-truncation data was restored.
+        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+        // But only on the replayed table.
+        assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+        // And that we have the correct truncation records.
+        assertFalse(SystemKeyspace.getTruncatedAt(cfs1.metadata.cfId) > time);
+        assertTrue(SystemKeyspace.getTruncatedAt(cfs2.metadata.cfId) > time);
+    }
+
+    private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
+    {
+        ColumnFamily cf;
+        ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
+        if (cfStore == null)
+        {
+            return null;
+        }
+        cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
+        if (cf == null)
+        {
+            return null;
+        }
+        return cf.getColumn(Util.cellname(columnName));
+    }
 }


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6404e015
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6404e015
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6404e015

Branch: refs/heads/trunk
Commit: 6404e015ffe916fe24bacb3772b388e633af1261
Parents: c799a98 f43efaa
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri May 1 19:50:06 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri May 1 19:50:06 2015 +0300

----------------------------------------------------------------------
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/commitlog/CommitLogReplayer.java         |  11 +-
 .../db/commitlog/CommitLogStressTest.java       |   3 +-
 .../db/RecoveryManagerTruncateTest.java         | 181 +++++++++++++++----
 4 files changed, 149 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index f6d1cc4,57f4b90..23ee9e3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -271,17 -264,22 +271,17 @@@ public class CommitLogReplaye
  
      public void recover(File file) throws IOException
      {
 -        logger.info("Replaying {}", file.getPath());
          CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 -        final long segmentId = desc.id;
 -        logger.info("Replaying {} (CL version {}, messaging version {})",
 -                    file.getPath(),
 -                    desc.version,
 -                    desc.getMessagingVersion());
          RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
 -
          try
          {
 -            assert reader.length() <= Integer.MAX_VALUE;
 -            int offset = getStartOffset(segmentId, desc.version);
 -            if (offset < 0)
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
              {
 -                logger.debug("skipping replay of fully-flushed {}", file);
 +                if (logAndCheckIfShouldSkip(file, desc))
 +                    return;
 +                if (globalPosition.segment == desc.id)
 +                    reader.seek(globalPosition.position);
-                 replaySyncSection(reader, -1, desc, replayFilter);
++                replaySyncSection(reader, -1, desc);
                  return;
              }
  
@@@ -365,208 -411,71 +365,207 @@@
                          continue;
                      }
  
-                 if (!replaySyncSection(sectionReader, replayEnd, desc, replayFilter))
 -                    if (logger.isDebugEnabled())
 -                        logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
++                if (!replaySyncSection(sectionReader, replayEnd, desc))
 +                    break;
 +            }
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(reader);
 +            logger.info("Finished reading {}", file);
 +        }
 +    }
  
 -                    final long entryLocation = reader.getFilePointer();
 -                    Runnable runnable = new WrappedRunnable()
 -                    {
 -                        public void runMayThrow() throws IOException
 -                        {
 -                            if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 -                                return;
 -                            if (pointInTimeExceeded(mutation))
 -                                return;
 -
 -                            final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 -
 -                            // Rebuild the mutation, omitting column families that
 -                            //    a) the user has requested that we ignore,
 -                            //    b) have already been flushed,
 -                            // or c) are part of a cf that was dropped.
 -                            // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 -                            Mutation newMutation = null;
 -                            for (ColumnFamily columnFamily : replayFilter.filter(mutation))
 -                            {
 -                                if (Schema.instance.getCF(columnFamily.id()) == null)
 -                                    continue; // dropped
 -
 -                                ReplayPosition rp = cfPositions.get(columnFamily.id());
 -
 -                                // replay if current segment is newer than last flushed one or,
 -                                // if it is the last known segment, if we are after the replay position
 -                                if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
 -                                {
 -                                    if (newMutation == null)
 -                                        newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 -                                    newMutation.add(columnFamily);
 -                                    replayedCount.incrementAndGet();
 -                                }
 -                            }
 -                            if (newMutation != null)
 -                            {
 -                                assert !newMutation.isEmpty();
 -                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
 -                                keyspacesRecovered.add(keyspace);
 -                            }
 -                        }
 -                    };
 -                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
 -                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
 -                    {
 -                        FBUtilities.waitOnFutures(futures);
 -                        futures.clear();
 -                    }
 +    public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
 +    {
 +        logger.info("Replaying {} (CL version {}, messaging version {}, compression {})",
 +                    file.getPath(),
 +                    desc.version,
 +                    desc.getMessagingVersion(),
 +                    desc.compression);
 +
 +        if (globalPosition.segment > desc.id)
 +        {
 +            logger.debug("skipping replay of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Replays a sync section containing a list of mutations.
 +     *
 +     * @return Whether replay should continue with the next section.
 +     */
-     private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc,
-             final ReplayFilter replayFilter) throws IOException, FileNotFoundException
++    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException, FileNotFoundException
 +    {
 +         /* read the logs populate Mutation and apply */
 +        while (reader.getFilePointer() < end && !reader.isEOF())
 +        {
 +            if (logger.isDebugEnabled())
 +                logger.trace("Reading mutation at {}", reader.getFilePointer());
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
 +                    return false;
                  }
  
 +                // Mutation must be at LEAST 10 bytes:
 +                // 3 each for a non-empty Keyspace and Key (including the
 +                // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                    return false;
 +
 +                long claimedSizeChecksum;
                  if (desc.version < CommitLogDescriptor.VERSION_21)
 -                    break;
 +                    claimedSizeChecksum = reader.readLong();
 +                else
 +                    claimedSizeChecksum = reader.readInt() & 0xffffffffL;
 +                checksum.reset();
 +                if (desc.version < CommitLogDescriptor.VERSION_20)
 +                    checksum.update(serializedSize);
 +                else
 +                    checksum.updateInt(serializedSize);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                    return false;
 +                // ok.
  
 -                offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
 -                prevEnd = end;
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +                if (desc.version < CommitLogDescriptor.VERSION_21)
 +                    claimedCRC32 = reader.readLong();
 +                else
 +                    claimedCRC32 = reader.readInt() & 0xffffffffL;
 +            }
 +            catch (EOFException eof)
 +            {
 +                return false; // last CL entry didn't get completely written. that's ok.
              }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                // this entry must not have been fsynced. probably the rest is bad too,
 +                // but just in case there is no harm in trying them (since we still read on an entry boundary)
 +                continue;
 +            }
-             replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter);
++            replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
          }
 -        finally
 +        return true;
 +    }
 +
 +    /**
 +     * Deserializes and replays a commit log entry.
 +     */
 +    void replayMutation(byte[] inputBuffer, int size,
-             final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException,
++            final long entryLocation, final CommitLogDescriptor desc) throws IOException,
 +            FileNotFoundException
 +    {
 +        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
 +        final Mutation mutation;
 +        try
          {
 -            FileUtils.closeQuietly(reader);
 -            logger.info("Finished reading {}", file);
 +            mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
 +                                                       desc.getMessagingVersion(),
 +                                                       ColumnSerializer.Flag.LOCAL);
 +            // doublecheck that what we read is [still] valid for the current schema
 +            for (ColumnFamily cf : mutation.getColumnFamilies())
 +                for (Cell cell : cf)
 +                    cf.getComparator().validate(cell.name());
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +            DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
 +            try
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +            finally
 +            {
 +                out.close();
 +            }
 +            String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
 +                                      f.getAbsolutePath());
 +            logger.error(st, t);
 +            return;
 +        }
 +
 +        if (logger.isDebugEnabled())
 +            logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
 +
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws IOException
 +            {
 +                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                    return;
 +                if (pointInTimeExceeded(mutation))
 +                    return;
 +
 +                final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +
 +                // Rebuild the mutation, omitting column families that
 +                //    a) the user has requested that we ignore,
 +                //    b) have already been flushed,
 +                // or c) are part of a cf that was dropped.
 +                // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 +                Mutation newMutation = null;
 +                for (ColumnFamily columnFamily : replayFilter.filter(mutation))
 +                {
 +                    if (Schema.instance.getCF(columnFamily.id()) == null)
 +                        continue; // dropped
 +
 +                    ReplayPosition rp = cfPositions.get(columnFamily.id());
 +
 +                    // replay if current segment is newer than last flushed one or,
 +                    // if it is the last known segment, if we are after the replay position
 +                    if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
 +                    {
 +                        if (newMutation == null)
 +                            newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 +                        newMutation.add(columnFamily);
 +                        replayedCount.incrementAndGet();
 +                    }
 +                }
 +                if (newMutation != null)
 +                {
 +                    assert !newMutation.isEmpty();
 +                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
 +                    keyspacesRecovered.add(keyspace);
 +                }
 +            }
 +        };
 +        futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
 +        if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
 +        {
 +            FBUtilities.waitOnFutures(futures);
 +            futures.clear();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 644e2c2,0000000..a8cf8fd
mode 100644,000000..100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -1,412 -1,0 +1,411 @@@
 +package org.apache.cassandra.db.commitlog;
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +
 +
 +import java.io.DataInputStream;
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import junit.framework.Assert;
 +
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.Config.CommitLogSync;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.Cell;
 +import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.db.ColumnSerializer;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.io.util.FastByteArrayInputStream;
 +
 +public class CommitLogStressTest
 +{
 +
 +    public static ByteBuffer dataSource;
 +    
 +    public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
 +
 +    public static int numCells = 1;
 +
 +    public static int cellSize = 1024;
 +    
 +    public static int rateLimit = 0;
 +    
 +    public static int runTimeMs = 10000;
 +    
 +    public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
 +    
 +    public static int hash(int hash, ByteBuffer bytes)
 +    {
 +        int shift = 0;
 +        for (int i=0; i<bytes.limit(); i++) {
 +            hash += (bytes.get(i) & 0xFF) << shift;
 +            shift = (shift + 8) & 0x1F;
 +        }
 +        return hash;
 +    }
 +    
 +    public static void main(String[] args) throws Exception {
 +        try {
 +            if (args.length >= 1) {
 +                NUM_THREADS = Integer.parseInt(args[0]);
 +                System.out.println("Setting num threads to: " + NUM_THREADS);
 +            }
 +    
 +            if (args.length >= 2) {
 +                numCells = Integer.parseInt(args[1]);
 +                System.out.println("Setting num cells to: " + numCells);
 +            }
 +    
 +            if (args.length >= 3) {
 +                cellSize = Integer.parseInt(args[1]);
 +                System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small");
 +            }
 +    
 +            if (args.length >= 4) {
 +                rateLimit = Integer.parseInt(args[1]);
 +                System.out.println("Setting per thread rate limit to: " + rateLimit);
 +            }
 +            initialize();
 +            
 +            CommitLogStressTest tester = new CommitLogStressTest();
 +            tester.testFixedSize();
 +        }
 +        catch (Exception e)
 +        {
 +            e.printStackTrace(System.err);
 +        }
 +        finally {
 +            System.exit(0);
 +        }
 +    }
 +    
 +    boolean failed = false;
 +    volatile boolean stop = false;
 +    boolean randomSize = false;
 +    boolean discardedRun = false;
 +    ReplayPosition discardedPos;
 +    
 +    @BeforeClass
 +    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
 +    {
 +        try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
 +        {
 +            dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size());
 +            while (dataSource.hasRemaining()) {
 +                fis.getChannel().read(dataSource);
 +            }
 +            dataSource.flip();
 +        }
 +
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
 +
 +        File dir = new File(location);
 +        if (dir.isDirectory())
 +        {
 +            File[] files = dir.listFiles();
 +    
 +            for (File f : files)
 +                if (!f.delete())
 +                    Assert.fail("Failed to delete " + f);
 +        } else {
 +            dir.mkdir();
 +        }
 +    }
 +
 +    @Test
 +    public void testRandomSize() throws Exception
 +    {
 +        randomSize = false;
 +        discardedRun = false;
 +        testAllLogConfigs();
 +    }
 +
 +    @Test
 +    public void testFixedSize() throws Exception
 +    {
 +        randomSize = false;
 +        discardedRun = false;
 +
 +        testAllLogConfigs();
 +    }
 +
 +    @Test
 +    public void testDiscardedRun() throws Exception
 +    {
 +        discardedRun = true;
 +        randomSize = true;
 +
 +        testAllLogConfigs();
 +    }
 +
 +    public void testAllLogConfigs() throws IOException, InterruptedException
 +    {
 +        failed = false;
 +        DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
 +        DatabaseDescriptor.setCommitLogSyncPeriod(30);
 +        DatabaseDescriptor.setCommitLogSegmentSize(32);
 +        for (ParameterizedClass compressor : new ParameterizedClass[] {
 +                null,
 +                new ParameterizedClass("LZ4Compressor", null),
 +                new ParameterizedClass("SnappyCompressor", null),
 +                new ParameterizedClass("DeflateCompressor", null)}) {
 +            DatabaseDescriptor.setCommitLogCompression(compressor);
 +            for (CommitLogSync sync : CommitLogSync.values())
 +            {
 +                DatabaseDescriptor.setCommitLogSync(sync);
 +                CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver);
 +                testLog(commitLog);
 +            }
 +        }
 +        assert !failed;
 +    }
 +
 +    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
 +        System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n",
 +                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
 +                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
 +                           commitLog.executor.getClass().getSimpleName(),
 +                           randomSize ? " random size" : "",
 +                           discardedRun ? " with discarded run" : "");
 +        commitLog.allocator.enableReserveSegmentCreation();
 +        
 +        final List<CommitlogExecutor> threads = new ArrayList<>();
 +        ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 +
 +        discardedPos = ReplayPosition.NONE;
 +        if (discardedRun) {
 +            // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
 +            Thread.sleep(runTimeMs / 3);
 +            stop = true;
 +            scheduled.shutdown();
 +            scheduled.awaitTermination(2, TimeUnit.SECONDS);
 +
 +            for (CommitlogExecutor t: threads)
 +            {
 +                t.join();
 +                CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp);
 +                if (t.rp.compareTo(discardedPos) > 0)
 +                    discardedPos = t.rp;
 +            }
 +            threads.clear();
 +            System.out.format("Discarded at %s\n", discardedPos);
 +
 +            scheduled = startThreads(commitLog, threads);
 +        }
 +
 +        
 +        Thread.sleep(runTimeMs);
 +        stop = true;
 +        scheduled.shutdown();
 +        scheduled.awaitTermination(2, TimeUnit.SECONDS);
 +
 +        int hash = 0;
 +        int cells = 0;
 +        for (CommitlogExecutor t: threads) {
 +            t.join();
 +            hash += t.hash;
 +            cells += t.cells;
 +        }
 +        
 +        commitLog.shutdownBlocking();
 +
 +        System.out.print("Stopped. Replaying... "); System.out.flush();
 +        Replayer repl = new Replayer();
 +        File[] files = new File(location).listFiles();
 +        repl.recover(files);
 +
 +        for (File f : files)
 +            if (!f.delete())
 +                Assert.fail("Failed to delete " + f);
 +        
 +        if (hash == repl.hash && cells == repl.cells)
 +            System.out.println("Test success.");
 +        else
 +        {
 +            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash);
 +            failed = true;
 +        }
 +    }
 +
 +    public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads)
 +    {
 +        stop = false;
 +        for (int ii = 0; ii < NUM_THREADS; ii++) {
 +            final CommitlogExecutor t = new CommitlogExecutor(commitLog);
 +            threads.add(t);
 +            t.start();
 +        }
 +
 +        final long start = System.currentTimeMillis();
 +        Runnable printRunnable = new Runnable() {
 +            long lastUpdate = 0;
 +
 +            public void run() {
 +              Runtime runtime = Runtime.getRuntime();
 +              long maxMemory = mb(runtime.maxMemory());
 +              long allocatedMemory = mb(runtime.totalMemory());
 +              long freeMemory = mb(runtime.freeMemory());
 +              long temp = 0;
 +              long sz = 0;
 +              for (CommitlogExecutor cle : threads) {
 +                  temp += cle.counter.get();
 +                  sz += cle.dataSize;
 +              }
 +              double time = (System.currentTimeMillis() - start) / 1000.0;
 +              double avg = (temp / time);
 +              System.out.println(String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
 +                      ((System.currentTimeMillis() - start) / 1000),
 +                      maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate, avg, mb(sz / time)));
 +              lastUpdate = temp;
 +            }
 +        };
 +        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
 +        scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
 +        return scheduled;
 +    }
 +
 +    private static long mb(long maxMemory) {
 +        return maxMemory / (1024 * 1024);
 +    }
 +
 +    private static double mb(double maxMemory) {
 +        return maxMemory / (1024 * 1024);
 +    }
 +
 +    public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) {
 +        ByteBuffer slice = ByteBuffer.allocate(quantity);
 +        ByteBuffer source = dataSource.duplicate();
 +        source.position(tlr.nextInt(source.capacity() - quantity));
 +        source.limit(source.position() + quantity);
 +        slice.put(source);
 +        slice.flip();
 +        return slice;
 +    }
 +
 +    public class CommitlogExecutor extends Thread {
 +        final AtomicLong counter = new AtomicLong();
 +        int hash = 0;
 +        int cells = 0;
 +        int dataSize = 0;
 +        final CommitLog commitLog;
 +
 +        volatile ReplayPosition rp;
 +
 +        public CommitlogExecutor(CommitLog commitLog)
 +        {
 +            this.commitLog = commitLog;
 +        }
 +
 +        public void run() {
 +            RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
 +            final ThreadLocalRandom tlr = ThreadLocalRandom.current();
 +            while (!stop) {
 +                if (rl != null)
 +                    rl.acquire();
 +                String ks = "Keyspace1";
 +                ByteBuffer key = randomBytes(16, tlr);
 +                Mutation mutation = new Mutation(ks, key);
 +
 +                for (int ii = 0; ii < numCells; ii++) {
 +                    int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
 +                    ByteBuffer bytes = randomBytes(sz, tlr);
 +                    mutation.add("Standard1", Util.cellname("name" + ii), bytes,
 +                            System.currentTimeMillis());
 +                    hash = hash(hash, bytes);
 +                    ++cells;
 +                    dataSize += sz;
 +                }
 +                rp = commitLog.add(mutation);
 +                counter.incrementAndGet();
 +            }
 +        }
 +    }
 +    
 +    class Replayer extends CommitLogReplayer
 +    {
 +        Replayer()
 +        {
 +            super(discardedPos, null, ReplayFilter.create());
 +        }
 +
 +        int hash = 0;
 +        int cells = 0;
 +
 +        @Override
-         void replayMutation(byte[] inputBuffer, int size,
-                 final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter)
++        void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
 +        {
 +            if (desc.id < discardedPos.segment) {
 +                System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
 +                return;
 +            } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
 +                // Skip over this mutation.
 +                return;
 +                
 +            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
 +                                                               desc.getMessagingVersion(),
 +                                                               ColumnSerializer.Flag.LOCAL);
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
 +
 +            for (ColumnFamily cf : mutation.getColumnFamilies()) {
 +                for (Cell c : cf.getSortedColumns()) {
 +                    if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
 +                    {
 +                        hash = hash(hash, c.value());
 +                        ++cells;
 +                    }
 +                }
 +            }
 +        }
 +        
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index cef5914,817b8e9..a004105
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,78 -19,165 +19,181 @@@
  package org.apache.cassandra.db;
  
  import static org.apache.cassandra.Util.column;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.*;
  
  import java.io.IOException;
++import org.junit.BeforeClass;
+ import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
- import org.junit.BeforeClass;
- import org.junit.Test;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  /**
   * Test for the truncate operation.
   */
 -public class RecoveryManagerTruncateTest extends SchemaLoader
 +public class RecoveryManagerTruncateTest
  {
 +    private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
 +    private static final String CF_STANDARD1 = "Standard1";
++    private static final String CF_STANDARD2 = "Standard2";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
-                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
++                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
++                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
 +    }
 +
- 	@Test
- 	public void testTruncate() throws IOException
- 	{
- 		Keyspace keyspace = Keyspace.open(KEYSPACE1);
- 		ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+     @Test
+     public void testTruncate() throws IOException
+     {
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
++        Keyspace keyspace = Keyspace.open(KEYSPACE1);
++        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
  
- 		Mutation rm;
- 		ColumnFamily cf;
+         Mutation rm;
+         ColumnFamily cf;
  
- 		// add a single cell
-         cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
- 		cf.addColumn(column("col1", "val1", 1L));
+         // add a single cell
 -        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++        cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+         cf.addColumn(column("col1", "val1", 1L));
 -        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
 -        rm.apply();
 +        rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
- 		rm.applyUnsafe();
++        rm.applyUnsafe();
+         long time = System.currentTimeMillis();
  
- 		// Make sure data was written
- 		assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+         // Make sure data was written
 -        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1"));
  
- 		// and now truncate it
- 		cfs.truncateBlocking();
+         // and now truncate it
+         cfs.truncateBlocking();
 -        CommitLog.instance.resetUnsafe();
 -        CommitLog.instance.recover();
 +        CommitLog.instance.resetUnsafe(false);
  
- 		// and validate truncation.
- 		assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
- 	}
- 
- 	private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
- 	{
- 		ColumnFamily cf;
- 		ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
- 		if (cfStore == null)
- 		{
- 			return null;
- 		}
- 		cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
- 		if (cf == null)
- 		{
- 			return null;
- 		}
- 		return cf.getColumn(Util.cellname(columnName));
- 	}
+         // and validate truncation.
 -        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
++        assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1"));
+         assertTrue(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+     }
+ 
+     @Test
+     public void testTruncatePointInTime() throws IOException
+     {
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
++        Keyspace keyspace = Keyspace.open(KEYSPACE1);
++        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ 
+         Mutation rm;
+         ColumnFamily cf;
+ 
+         // add a single cell
 -        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++        cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+         cf.addColumn(column("col2", "val1", 1L));
 -        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++        rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+         rm.apply();
+ 
+         // Make sure data was written
+         long time = System.currentTimeMillis();
 -        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+ 
+         // and now truncate it
+         cfs.truncateBlocking();
+ 
+         // verify truncation
 -        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++        assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+ 
+         try
+         {
+             // Restore to point in time.
+             CommitLog.instance.archiver.restorePointInTime = time;
 -            CommitLog.instance.resetUnsafe();
 -            CommitLog.instance.recover();
++            CommitLog.instance.resetUnsafe(false);
+         }
+         finally
+         {
+             CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+         }
+ 
+         // Validate pre-truncation data was restored.
 -        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+         // And that we don't have a truncation record after restore time.
+         assertFalse(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+     }
+ 
+     @Test
+     public void testTruncatePointInTimeReplayList() throws IOException
+     {
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 -        ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore("Standard1");
 -        ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("Standard2");
++        Keyspace keyspace = Keyspace.open(KEYSPACE1);
++        ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore(CF_STANDARD1);
++        ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore(CF_STANDARD2);
+ 
+         Mutation rm;
+         ColumnFamily cf;
+ 
+         // add a single cell
 -        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++        cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+         cf.addColumn(column("col3", "val1", 1L));
 -        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++        rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+         rm.apply();
+ 
+         // add a single cell
 -        cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard2");
++        cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD2);
+         cf.addColumn(column("col4", "val1", 1L));
 -        rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++        rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+         rm.apply();
+ 
+         // Make sure data was written
+         long time = System.currentTimeMillis();
 -        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
 -        assertNotNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+ 
+         // and now truncate it
+         cfs1.truncateBlocking();
+         cfs2.truncateBlocking();
+ 
+         // verify truncation
 -        assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
 -        assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++        assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
++        assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+ 
+         try
+         {
+             // Restore to point in time.
+             CommitLog.instance.archiver.restorePointInTime = time;
 -            System.setProperty("cassandra.replayList", "Keyspace1.Standard1");
 -            CommitLog.instance.resetUnsafe();
 -            CommitLog.instance.recover();
++            System.setProperty("cassandra.replayList", KEYSPACE1 + "." + CF_STANDARD1);
++            CommitLog.instance.resetUnsafe(false);
+         }
+         finally
+         {
+             CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+             System.clearProperty("cassandra.replayList");
+         }
+ 
+         // Validate pre-truncation data was restored.
 -        assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
++        assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
+         // But only on the replayed table.
 -        assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++        assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+ 
+         // And that we have the correct truncation records.
+         assertFalse(SystemKeyspace.getTruncatedAt(cfs1.metadata.cfId) > time);
+         assertTrue(SystemKeyspace.getTruncatedAt(cfs2.metadata.cfId) > time);
+     }
+ 
+     private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
+     {
+         ColumnFamily cf;
+         ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
+         if (cfStore == null)
+         {
+             return null;
+         }
+         cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
+         if (cf == null)
+         {
+             return null;
+         }
+         return cf.getColumn(Util.cellname(columnName));
+     }
  }