You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/06/24 18:51:56 UTC

cassandra git commit: Expand upgrade testing for commitlog changes

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 01115f72f -> a384faaa8


Expand upgrade testing for commitlog changes

Patch by blambov; reviewed by jmckenzie for CASSANDRA-9346


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a384faaa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a384faaa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a384faaa

Branch: refs/heads/cassandra-2.2
Commit: a384faaa8aa2c5f0f313011a30ef64e7e795ab1e
Parents: 01115f7
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Jun 24 12:47:59 2015 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Wed Jun 24 12:47:59 2015 -0400

----------------------------------------------------------------------
 .../db/commitlog/CommitLogReplayer.java         |   2 +-
 .../2.0/CommitLog-3-1431528750790.log           | Bin 0 -> 2097152 bytes
 .../2.0/CommitLog-3-1431528750791.log           | Bin 0 -> 2097152 bytes
 .../2.0/CommitLog-3-1431528750792.log           | Bin 0 -> 2097152 bytes
 .../2.0/CommitLog-3-1431528750793.log           | Bin 0 -> 2097152 bytes
 test/data/legacy-commitlog/2.0/hash.txt         |   3 +
 .../2.1/CommitLog-4-1431529069529.log           | Bin 0 -> 2097152 bytes
 .../2.1/CommitLog-4-1431529069530.log           | Bin 0 -> 2097152 bytes
 test/data/legacy-commitlog/2.1/hash.txt         |   3 +
 .../db/commitlog/CommitLogStressTest.java       | 217 +++++++++-------
 .../db/commitlog/CommitLogUpgradeTest.java      | 143 +++++++++++
 .../db/commitlog/CommitLogUpgradeTestMaker.java | 250 +++++++++++++++++++
 12 files changed, 527 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a59e70e..176f64b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -281,7 +281,7 @@ public class CommitLogReplayer
                     return;
                 if (globalPosition.segment == desc.id)
                     reader.seek(globalPosition.position);
-                replaySyncSection(reader, -1, desc);
+                replaySyncSection(reader, (int) reader.getPositionLimit(), desc);
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log
new file mode 100644
index 0000000..3301331
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log
new file mode 100644
index 0000000..04314d6
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log
new file mode 100644
index 0000000..a9af9e4
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log
new file mode 100644
index 0000000..3301331
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/hash.txt b/test/data/legacy-commitlog/2.0/hash.txt
new file mode 100644
index 0000000..4bbec02
--- /dev/null
+++ b/test/data/legacy-commitlog/2.0/hash.txt
@@ -0,0 +1,3 @@
+cfid = 4d331c44-f018-302b-91c2-2dcf94c4bfad
+cells = 9724
+hash = -682777064

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log
new file mode 100644
index 0000000..60064ee
Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log
new file mode 100644
index 0000000..fdf7071
Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/hash.txt b/test/data/legacy-commitlog/2.1/hash.txt
new file mode 100644
index 0000000..f05cf97
--- /dev/null
+++ b/test/data/legacy-commitlog/2.1/hash.txt
@@ -0,0 +1,3 @@
+cfid = 6c622920-f980-11e4-b8a0-e7d448d5e26d
+cells = 5165
+hash = -1915888171

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index f5fd2cf..5897dec 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -1,4 +1,5 @@
 package org.apache.cassandra.db.commitlog;
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +21,6 @@ package org.apache.cassandra.db.commitlog;
  *
  */
 
-
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -60,54 +60,56 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
 
 public class CommitLogStressTest
 {
-
     public static ByteBuffer dataSource;
-    
-    public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
 
+    public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
     public static int numCells = 1;
-
     public static int cellSize = 1024;
-    
     public static int rateLimit = 0;
-    
     public static int runTimeMs = 10000;
-    
+
     public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
-    
+
     public static int hash(int hash, ByteBuffer bytes)
     {
         int shift = 0;
-        for (int i=0; i<bytes.limit(); i++) {
+        for (int i = 0; i < bytes.limit(); i++)
+        {
             hash += (bytes.get(i) & 0xFF) << shift;
             shift = (shift + 8) & 0x1F;
         }
         return hash;
     }
-    
-    public static void main(String[] args) throws Exception {
-        try {
-            if (args.length >= 1) {
+
+    public static void main(String[] args) throws Exception
+    {
+        try
+        {
+            if (args.length >= 1)
+            {
                 NUM_THREADS = Integer.parseInt(args[0]);
                 System.out.println("Setting num threads to: " + NUM_THREADS);
             }
-    
-            if (args.length >= 2) {
+
+            if (args.length >= 2)
+            {
                 numCells = Integer.parseInt(args[1]);
                 System.out.println("Setting num cells to: " + numCells);
             }
-    
-            if (args.length >= 3) {
+
+            if (args.length >= 3)
+            {
                 cellSize = Integer.parseInt(args[1]);
                 System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small");
             }
-    
-            if (args.length >= 4) {
+
+            if (args.length >= 4)
+            {
                 rateLimit = Integer.parseInt(args[1]);
                 System.out.println("Setting per thread rate limit to: " + rateLimit);
             }
             initialize();
-            
+
             CommitLogStressTest tester = new CommitLogStressTest();
             tester.testFixedSize();
         }
@@ -115,24 +117,26 @@ public class CommitLogStressTest
         {
             e.printStackTrace(System.err);
         }
-        finally {
+        finally
+        {
             System.exit(0);
         }
     }
-    
+
     boolean failed = false;
     volatile boolean stop = false;
     boolean randomSize = false;
     boolean discardedRun = false;
     ReplayPosition discardedPos;
-    
+
     @BeforeClass
-    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+    static public void initialize() throws IOException
     {
         try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
         {
-            dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size());
-            while (dataSource.hasRemaining()) {
+            dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size());
+            while (dataSource.hasRemaining())
+            {
                 fis.getChannel().read(dataSource);
             }
             dataSource.flip();
@@ -141,7 +145,7 @@ public class CommitLogStressTest
         SchemaLoader.loadSchema();
         SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
     }
-    
+
     @Before
     public void cleanDir()
     {
@@ -149,11 +153,13 @@ public class CommitLogStressTest
         if (dir.isDirectory())
         {
             File[] files = dir.listFiles();
-    
+
             for (File f : files)
                 if (!f.delete())
                     Assert.fail("Failed to delete " + f);
-        } else {
+        }
+        else
+        {
             dir.mkdir();
         }
     }
@@ -194,7 +200,8 @@ public class CommitLogStressTest
                 null,
                 new ParameterizedClass("LZ4Compressor", null),
                 new ParameterizedClass("SnappyCompressor", null),
-                new ParameterizedClass("DeflateCompressor", null)}) {
+                new ParameterizedClass("DeflateCompressor", null) })
+        {
             DatabaseDescriptor.setCommitLogCompression(compressor);
             for (CommitLogSync sync : CommitLogSync.values())
             {
@@ -206,27 +213,29 @@ public class CommitLogStressTest
         assert !failed;
     }
 
-    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+    public void testLog(CommitLog commitLog) throws IOException, InterruptedException
+    {
         System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
-                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                           commitLog.executor.getClass().getSimpleName(),
-                           randomSize ? " random size" : "",
-                           discardedRun ? " with discarded run" : "");
+                          mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+                          commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                          commitLog.executor.getClass().getSimpleName(),
+                          randomSize ? " random size" : "",
+                          discardedRun ? " with discarded run" : "");
         commitLog.allocator.enableReserveSegmentCreation();
-        
+
         final List<CommitlogExecutor> threads = new ArrayList<>();
         ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 
         discardedPos = ReplayPosition.NONE;
-        if (discardedRun) {
+        if (discardedRun)
+        {
             // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
             Thread.sleep(runTimeMs / 3);
             stop = true;
             scheduled.shutdown();
             scheduled.awaitTermination(2, TimeUnit.SECONDS);
 
-            for (CommitlogExecutor t: threads)
+            for (CommitlogExecutor t : threads)
             {
                 t.join();
                 if (t.rp.compareTo(discardedPos) > 0)
@@ -234,15 +243,15 @@ public class CommitLogStressTest
             }
             verifySizes(commitLog);
 
-            commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
+            commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
+                                               discardedPos);
             threads.clear();
             System.out.format("Discarded at %s\n", discardedPos);
             verifySizes(commitLog);
-            
+
             scheduled = startThreads(commitLog, threads);
         }
 
-        
         Thread.sleep(runTimeMs);
         stop = true;
         scheduled.shutdown();
@@ -250,16 +259,18 @@ public class CommitLogStressTest
 
         int hash = 0;
         int cells = 0;
-        for (CommitlogExecutor t: threads) {
+        for (CommitlogExecutor t : threads)
+        {
             t.join();
             hash += t.hash;
             cells += t.cells;
         }
         verifySizes(commitLog);
-        
+
         commitLog.shutdownBlocking();
 
-        System.out.print("Stopped. Replaying... "); System.out.flush();
+        System.out.print("Stopped. Replaying... ");
+        System.out.flush();
         Replayer repl = new Replayer();
         File[] files = new File(location).listFiles();
         repl.recover(files);
@@ -267,12 +278,16 @@ public class CommitLogStressTest
         for (File f : files)
             if (!f.delete())
                 Assert.fail("Failed to delete " + f);
-        
+
         if (hash == repl.hash && cells == repl.cells)
             System.out.println("Test success.");
         else
         {
-            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash);
+            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
+                              repl.cells,
+                              cells,
+                              repl.hash,
+                              hash);
             failed = true;
         }
     }
@@ -287,7 +302,7 @@ public class CommitLogStressTest
         commitLog.executor.requestExtraSync().awaitUninterruptibly();
         // Wait for any pending deletes or segment allocations to complete.
         commitLog.allocator.awaitManagementTasksCompletion();
-        
+
         long combinedSize = 0;
         for (File f : new File(commitLog.location).listFiles())
             combinedSize += f.length();
@@ -297,11 +312,11 @@ public class CommitLogStressTest
         Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
         Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments();
 
-        for (CommitLogSegment segment: segments)
+        for (CommitLogSegment segment : segments)
         {
             Assert.assertTrue(logFileNames.remove(segment.getName()));
             Double ratio = ratios.remove(segment.getName());
-            
+
             Assert.assertEquals(segment.logFile.length(), segment.onDiskSize());
             Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio, 0.01);
         }
@@ -312,35 +327,47 @@ public class CommitLogStressTest
     public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads)
     {
         stop = false;
-        for (int ii = 0; ii < NUM_THREADS; ii++) {
+        for (int ii = 0; ii < NUM_THREADS; ii++)
+        {
             final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii));
             threads.add(t);
             t.start();
         }
 
         final long start = System.currentTimeMillis();
-        Runnable printRunnable = new Runnable() {
+        Runnable printRunnable = new Runnable()
+        {
             long lastUpdate = 0;
 
-            public void run() {
-              Runtime runtime = Runtime.getRuntime();
-              long maxMemory = runtime.maxMemory();
-              long allocatedMemory = runtime.totalMemory();
-              long freeMemory = runtime.freeMemory();
-              long temp = 0;
-              long sz = 0;
-              for (CommitlogExecutor cle : threads) {
-                  temp += cle.counter.get();
-                  sz += cle.dataSize;
-              }
-              double time = (System.currentTimeMillis() - start) / 1000.0;
-              double avg = (temp / time);
-              System.out.println(
-                      String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
-                      ((System.currentTimeMillis() - start) / 1000),
-                      mb(maxMemory), mb(allocatedMemory), mb(freeMemory), (temp - lastUpdate), lastUpdate, avg,
-                      mb(commitLog.getActiveContentSize()), mb(commitLog.getActiveOnDiskSize()), mb(sz / time)));
-              lastUpdate = temp;
+            public void run()
+            {
+                Runtime runtime = Runtime.getRuntime();
+                long maxMemory = runtime.maxMemory();
+                long allocatedMemory = runtime.totalMemory();
+                long freeMemory = runtime.freeMemory();
+                long temp = 0;
+                long sz = 0;
+                for (CommitlogExecutor cle : threads)
+                {
+                    temp += cle.counter.get();
+                    sz += cle.dataSize;
+                }
+                double time = (System.currentTimeMillis() - start) / 1000.0;
+                double avg = (temp / time);
+                System.out
+                        .println(
+                        String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
+                                      ((System.currentTimeMillis() - start) / 1000),
+                                      mb(maxMemory),
+                                      mb(allocatedMemory),
+                                      mb(freeMemory),
+                                      (temp - lastUpdate),
+                                      lastUpdate,
+                                      avg,
+                                      mb(commitLog.getActiveContentSize()),
+                                      mb(commitLog.getActiveOnDiskSize()),
+                                      mb(sz / time)));
+                lastUpdate = temp;
             }
         };
         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
@@ -348,15 +375,18 @@ public class CommitLogStressTest
         return scheduled;
     }
 
-    private static double mb(long maxMemory) {
+    private static double mb(long maxMemory)
+    {
         return maxMemory / (1024.0 * 1024);
     }
 
-    private static double mb(double maxMemory) {
+    private static double mb(double maxMemory)
+    {
         return maxMemory / (1024 * 1024);
     }
 
-    public static ByteBuffer randomBytes(int quantity, Random tlr) {
+    public static ByteBuffer randomBytes(int quantity, Random tlr)
+    {
         ByteBuffer slice = ByteBuffer.allocate(quantity);
         ByteBuffer source = dataSource.duplicate();
         source.position(tlr.nextInt(source.capacity() - quantity));
@@ -366,7 +396,8 @@ public class CommitLogStressTest
         return slice;
     }
 
-    public class CommitlogExecutor extends Thread {
+    public class CommitlogExecutor extends Thread
+    {
         final AtomicLong counter = new AtomicLong();
         int hash = 0;
         int cells = 0;
@@ -382,21 +413,23 @@ public class CommitLogStressTest
             this.random = rand;
         }
 
-        public void run() {
+        public void run()
+        {
             RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
             final Random rand = random != null ? random : ThreadLocalRandom.current();
-            while (!stop) {
+            while (!stop)
+            {
                 if (rl != null)
                     rl.acquire();
                 String ks = "Keyspace1";
                 ByteBuffer key = randomBytes(16, rand);
                 Mutation mutation = new Mutation(ks, key);
 
-                for (int ii = 0; ii < numCells; ii++) {
+                for (int ii = 0; ii < numCells; ii++)
+                {
                     int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
                     ByteBuffer bytes = randomBytes(sz, rand);
-                    mutation.add("Standard1", Util.cellname("name" + ii), bytes,
-                            System.currentTimeMillis());
+                    mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis());
                     hash = hash(hash, bytes);
                     ++cells;
                     dataSize += sz;
@@ -406,7 +439,7 @@ public class CommitLogStressTest
             }
         }
     }
-    
+
     class Replayer extends CommitLogReplayer
     {
         Replayer()
@@ -420,20 +453,22 @@ public class CommitLogStressTest
         @Override
         void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
         {
-            if (desc.id < discardedPos.segment) {
+            if (desc.id < discardedPos.segment)
+            {
                 System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
                 return;
-            } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+            }
+            else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
                 // Skip over this mutation.
                 return;
-                
+
             FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
             Mutation mutation;
             try
             {
                 mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
-                                                               desc.getMessagingVersion(),
-                                                               ColumnSerializer.Flag.LOCAL);
+                                                           desc.getMessagingVersion(),
+                                                           ColumnSerializer.Flag.LOCAL);
             }
             catch (IOException e)
             {
@@ -441,8 +476,10 @@ public class CommitLogStressTest
                 throw new AssertionError(e);
             }
 
-            for (ColumnFamily cf : mutation.getColumnFamilies()) {
-                for (Cell c : cf.getSortedColumns()) {
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+            {
+                for (Cell c : cf.getSortedColumns())
+                {
                     if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
                     {
                         hash = hash(hash, c.value());
@@ -451,6 +488,6 @@ public class CommitLogStressTest
                 }
             }
         }
-        
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
new file mode 100644
index 0000000..1655078
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -0,0 +1,143 @@
+package org.apache.cassandra.db.commitlog;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import com.google.common.base.Predicate;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Mutation;
+
+public class CommitLogUpgradeTest
+{
+    static final String DATA_DIR = "test/data/legacy-commitlog/";
+    static final String PROPERTIES_FILE = "hash.txt";
+    static final String CFID_PROPERTY = "cfid";
+    static final String CELLS_PROPERTY = "cells";
+    static final String HASH_PROPERTY = "hash";
+
+    static final String TABLE = "Standard1";
+    static final String KEYSPACE = "Keyspace1";
+    static final String CELLNAME = "name";
+
+    @Test
+    public void test20() throws Exception
+    {
+        testRestore(DATA_DIR + "2.0");
+    }
+
+    @Test
+    public void test21() throws Exception
+    {
+        testRestore(DATA_DIR + "2.1");
+    }
+
+    @BeforeClass
+    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition("");
+    }
+
+    public void testRestore(String location) throws IOException, InterruptedException
+    {
+        Properties prop = new Properties();
+        prop.load(new FileInputStream(new File(location + File.separatorChar + PROPERTIES_FILE)));
+        int hash = Integer.parseInt(prop.getProperty(HASH_PROPERTY));
+        int cells = Integer.parseInt(prop.getProperty(CELLS_PROPERTY));
+
+        String cfidString = prop.getProperty(CFID_PROPERTY);
+        if (cfidString != null)
+        {
+            UUID cfid = UUID.fromString(cfidString);
+            if (Schema.instance.getCF(cfid) == null)
+            {
+                CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+                Schema.instance.purge(cfm);
+                Schema.instance.load(cfm.copy(cfid));
+            }
+        }
+
+        Hasher hasher = new Hasher();
+        CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher);
+        File[] files = new File(location).listFiles(new FilenameFilter()
+        {
+            @Override
+            public boolean accept(File dir, String name)
+            {
+                return name.endsWith(".log");
+            }
+        });
+        replayer.recover(files);
+
+        Assert.assertEquals(cells, hasher.cells);
+        Assert.assertEquals(hash, hasher.hash);
+    }
+
+    public static int hash(int hash, ByteBuffer bytes)
+    {
+        int shift = 0;
+        for (int i = 0; i < bytes.limit(); i++)
+        {
+            hash += (bytes.get(i) & 0xFF) << shift;
+            shift = (shift + 8) & 0x1F;
+        }
+        return hash;
+    }
+
+    class Hasher implements Predicate<Mutation>
+    {
+        int hash = 0;
+        int cells = 0;
+
+        @Override
+        public boolean apply(Mutation mutation)
+        {
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+            {
+                for (Cell c : cf.getSortedColumns())
+                {
+                    if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith(CELLNAME))
+                    {
+                        hash = hash(hash, c.value());
+                        ++cells;
+                    }
+                }
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
new file mode 100644
index 0000000..7b07c8e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -0,0 +1,250 @@
+package org.apache.cassandra.db.commitlog;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.commitlog.CommitLogUpgradeTest.*;
+
+public class CommitLogUpgradeTestMaker
+{
+    public static ByteBuffer dataSource;
+
+    private static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
+    public static int numCells = 1;
+    public static int cellSize = 256;
+    public static int rateLimit = 0;
+    public static int runTimeMs = 1000;
+
+    public static void main(String[] args) throws Exception
+    {
+        try
+        {
+            initialize();
+
+            CommitLogUpgradeTestMaker tester = new CommitLogUpgradeTestMaker();
+            tester.makeLog();
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace(System.err);
+        }
+        finally
+        {
+            System.exit(0);
+        }
+    }
+
+    volatile boolean stop = false;
+    boolean randomSize = true;
+
+    static public void initialize() throws IOException, ConfigurationException
+    {
+        try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+        {
+            dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size());
+            while (dataSource.hasRemaining())
+            {
+                fis.getChannel().read(dataSource);
+            }
+            dataSource.flip();
+        }
+
+        SchemaLoader.loadSchema();
+        SchemaLoader.schemaDefinition("");
+    }
+
+    public void makeLog() throws IOException, InterruptedException
+    {
+        CommitLog commitLog = CommitLog.instance;
+        System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
+                          mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+                          commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                          commitLog.executor.getClass().getSimpleName(),
+                          randomSize ? " random size" : "");
+        final List<CommitlogExecutor> threads = new ArrayList<>();
+        ScheduledExecutorService scheduled = startThreads(commitLog, threads);
+
+        Thread.sleep(runTimeMs);
+        stop = true;
+        scheduled.shutdown();
+        scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+        int hash = 0;
+        int cells = 0;
+        for (CommitlogExecutor t : threads)
+        {
+            t.join();
+            hash += t.hash;
+            cells += t.cells;
+        }
+        commitLog.shutdownBlocking();
+
+        File dataDir = new File(CommitLogUpgradeTest.DATA_DIR + FBUtilities.getReleaseVersionString());
+        System.out.format("Data will be stored in %s\n", dataDir);
+        if (dataDir.exists())
+            FileUtils.deleteRecursive(dataDir);
+
+        dataDir.mkdirs();
+        for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+            FileUtils.createHardLink(f, new File(dataDir, f.getName()));
+
+        Properties prop = new Properties();
+        prop.setProperty(CFID_PROPERTY, Schema.instance.getId(KEYSPACE, TABLE).toString());
+        prop.setProperty(CELLS_PROPERTY, Integer.toString(cells));
+        prop.setProperty(HASH_PROPERTY, Integer.toString(hash));
+        prop.store(new FileOutputStream(new File(dataDir, PROPERTIES_FILE)),
+                   "CommitLog upgrade test, version " + FBUtilities.getReleaseVersionString());
+        System.out.println("Done");
+    }
+
+    public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads)
+    {
+        stop = false;
+        for (int ii = 0; ii < NUM_THREADS; ii++)
+        {
+            final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+            threads.add(t);
+            t.start();
+        }
+
+        final long start = System.currentTimeMillis();
+        Runnable printRunnable = new Runnable()
+        {
+            long lastUpdate = 0;
+
+            public void run()
+            {
+                Runtime runtime = Runtime.getRuntime();
+                long maxMemory = mb(runtime.maxMemory());
+                long allocatedMemory = mb(runtime.totalMemory());
+                long freeMemory = mb(runtime.freeMemory());
+                long temp = 0;
+                long sz = 0;
+                for (CommitlogExecutor cle : threads)
+                {
+                    temp += cle.counter.get();
+                    sz += cle.dataSize;
+                }
+                double time = (System.currentTimeMillis() - start) / 1000.0;
+                double avg = (temp / time);
+                System.out.println(
+                        String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
+                                      ((System.currentTimeMillis() - start) / 1000),
+                                      maxMemory,
+                                      allocatedMemory,
+                                      freeMemory,
+                                      (temp - lastUpdate),
+                                      lastUpdate,
+                                      avg,
+                                      mb(sz / time)));
+                lastUpdate = temp;
+            }
+        };
+        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+        scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
+        return scheduled;
+    }
+
+    private static long mb(long maxMemory)
+    {
+        return maxMemory / (1024 * 1024);
+    }
+
+    private static double mb(double maxMemory)
+    {
+        return maxMemory / (1024 * 1024);
+    }
+
+    public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr)
+    {
+        ByteBuffer slice = ByteBuffer.allocate(quantity);
+        ByteBuffer source = dataSource.duplicate();
+        source.position(tlr.nextInt(source.capacity() - quantity));
+        source.limit(source.position() + quantity);
+        slice.put(source);
+        slice.flip();
+        return slice;
+    }
+
+    public class CommitlogExecutor extends Thread
+    {
+        final AtomicLong counter = new AtomicLong();
+        int hash = 0;
+        int cells = 0;
+        int dataSize = 0;
+        final CommitLog commitLog;
+
+        volatile ReplayPosition rp;
+
+        public CommitlogExecutor(CommitLog commitLog)
+        {
+            this.commitLog = commitLog;
+        }
+
+        public void run()
+        {
+            RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
+            final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+            while (!stop)
+            {
+                if (rl != null)
+                    rl.acquire();
+                String ks = KEYSPACE;
+                ByteBuffer key = randomBytes(16, tlr);
+                Mutation mutation = new Mutation(ks, key);
+
+                for (int ii = 0; ii < numCells; ii++)
+                {
+                    int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
+                    ByteBuffer bytes = randomBytes(sz, tlr);
+                    mutation.add(TABLE, Util.cellname(CELLNAME + ii), bytes, System.currentTimeMillis());
+                    hash = hash(hash, bytes);
+                    ++cells;
+                    dataSize += sz;
+                }
+                rp = commitLog.add(mutation);
+                counter.incrementAndGet();
+            }
+        }
+    }
+}