You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/06/24 18:51:56 UTC
cassandra git commit: Expand upgrade testing for commitlog changes
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 01115f72f -> a384faaa8
Expand upgrade testing for commitlog changes
Patch by blambov; reviewed by jmckenzie for CASSANDRA-9346
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a384faaa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a384faaa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a384faaa
Branch: refs/heads/cassandra-2.2
Commit: a384faaa8aa2c5f0f313011a30ef64e7e795ab1e
Parents: 01115f7
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Jun 24 12:47:59 2015 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Wed Jun 24 12:47:59 2015 -0400
----------------------------------------------------------------------
.../db/commitlog/CommitLogReplayer.java | 2 +-
.../2.0/CommitLog-3-1431528750790.log | Bin 0 -> 2097152 bytes
.../2.0/CommitLog-3-1431528750791.log | Bin 0 -> 2097152 bytes
.../2.0/CommitLog-3-1431528750792.log | Bin 0 -> 2097152 bytes
.../2.0/CommitLog-3-1431528750793.log | Bin 0 -> 2097152 bytes
test/data/legacy-commitlog/2.0/hash.txt | 3 +
.../2.1/CommitLog-4-1431529069529.log | Bin 0 -> 2097152 bytes
.../2.1/CommitLog-4-1431529069530.log | Bin 0 -> 2097152 bytes
test/data/legacy-commitlog/2.1/hash.txt | 3 +
.../db/commitlog/CommitLogStressTest.java | 217 +++++++++-------
.../db/commitlog/CommitLogUpgradeTest.java | 143 +++++++++++
.../db/commitlog/CommitLogUpgradeTestMaker.java | 250 +++++++++++++++++++
12 files changed, 527 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index a59e70e..176f64b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -281,7 +281,7 @@ public class CommitLogReplayer
return;
if (globalPosition.segment == desc.id)
reader.seek(globalPosition.position);
- replaySyncSection(reader, -1, desc);
+ replaySyncSection(reader, (int) reader.getPositionLimit(), desc);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log
new file mode 100644
index 0000000..3301331
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750790.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log
new file mode 100644
index 0000000..04314d6
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750791.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log
new file mode 100644
index 0000000..a9af9e4
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750792.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log
new file mode 100644
index 0000000..3301331
Binary files /dev/null and b/test/data/legacy-commitlog/2.0/CommitLog-3-1431528750793.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.0/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.0/hash.txt b/test/data/legacy-commitlog/2.0/hash.txt
new file mode 100644
index 0000000..4bbec02
--- /dev/null
+++ b/test/data/legacy-commitlog/2.0/hash.txt
@@ -0,0 +1,3 @@
+cfid = 4d331c44-f018-302b-91c2-2dcf94c4bfad
+cells = 9724
+hash = -682777064
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log
new file mode 100644
index 0000000..60064ee
Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069529.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log
new file mode 100644
index 0000000..fdf7071
Binary files /dev/null and b/test/data/legacy-commitlog/2.1/CommitLog-4-1431529069530.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/data/legacy-commitlog/2.1/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.1/hash.txt b/test/data/legacy-commitlog/2.1/hash.txt
new file mode 100644
index 0000000..f05cf97
--- /dev/null
+++ b/test/data/legacy-commitlog/2.1/hash.txt
@@ -0,0 +1,3 @@
+cfid = 6c622920-f980-11e4-b8a0-e7d448d5e26d
+cells = 5165
+hash = -1915888171
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index f5fd2cf..5897dec 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -1,4 +1,5 @@
package org.apache.cassandra.db.commitlog;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +21,6 @@ package org.apache.cassandra.db.commitlog;
*
*/
-
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -60,54 +60,56 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
public class CommitLogStressTest
{
-
public static ByteBuffer dataSource;
-
- public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
+ public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
public static int numCells = 1;
-
public static int cellSize = 1024;
-
public static int rateLimit = 0;
-
public static int runTimeMs = 10000;
-
+
public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
-
+
public static int hash(int hash, ByteBuffer bytes)
{
int shift = 0;
- for (int i=0; i<bytes.limit(); i++) {
+ for (int i = 0; i < bytes.limit(); i++)
+ {
hash += (bytes.get(i) & 0xFF) << shift;
shift = (shift + 8) & 0x1F;
}
return hash;
}
-
- public static void main(String[] args) throws Exception {
- try {
- if (args.length >= 1) {
+
+ public static void main(String[] args) throws Exception
+ {
+ try
+ {
+ if (args.length >= 1)
+ {
NUM_THREADS = Integer.parseInt(args[0]);
System.out.println("Setting num threads to: " + NUM_THREADS);
}
-
- if (args.length >= 2) {
+
+ if (args.length >= 2)
+ {
numCells = Integer.parseInt(args[1]);
System.out.println("Setting num cells to: " + numCells);
}
-
- if (args.length >= 3) {
+
+ if (args.length >= 3)
+ {
cellSize = Integer.parseInt(args[1]);
System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small");
}
-
- if (args.length >= 4) {
+
+ if (args.length >= 4)
+ {
rateLimit = Integer.parseInt(args[1]);
System.out.println("Setting per thread rate limit to: " + rateLimit);
}
initialize();
-
+
CommitLogStressTest tester = new CommitLogStressTest();
tester.testFixedSize();
}
@@ -115,24 +117,26 @@ public class CommitLogStressTest
{
e.printStackTrace(System.err);
}
- finally {
+ finally
+ {
System.exit(0);
}
}
-
+
boolean failed = false;
volatile boolean stop = false;
boolean randomSize = false;
boolean discardedRun = false;
ReplayPosition discardedPos;
-
+
@BeforeClass
- static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+ static public void initialize() throws IOException
{
try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
{
- dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size());
- while (dataSource.hasRemaining()) {
+ dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size());
+ while (dataSource.hasRemaining())
+ {
fis.getChannel().read(dataSource);
}
dataSource.flip();
@@ -141,7 +145,7 @@ public class CommitLogStressTest
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
}
-
+
@Before
public void cleanDir()
{
@@ -149,11 +153,13 @@ public class CommitLogStressTest
if (dir.isDirectory())
{
File[] files = dir.listFiles();
-
+
for (File f : files)
if (!f.delete())
Assert.fail("Failed to delete " + f);
- } else {
+ }
+ else
+ {
dir.mkdir();
}
}
@@ -194,7 +200,8 @@ public class CommitLogStressTest
null,
new ParameterizedClass("LZ4Compressor", null),
new ParameterizedClass("SnappyCompressor", null),
- new ParameterizedClass("DeflateCompressor", null)}) {
+ new ParameterizedClass("DeflateCompressor", null) })
+ {
DatabaseDescriptor.setCommitLogCompression(compressor);
for (CommitLogSync sync : CommitLogSync.values())
{
@@ -206,27 +213,29 @@ public class CommitLogStressTest
assert !failed;
}
- public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+ public void testLog(CommitLog commitLog) throws IOException, InterruptedException
+ {
System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
- mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.executor.getClass().getSimpleName(),
- randomSize ? " random size" : "",
- discardedRun ? " with discarded run" : "");
+ mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+ commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+ commitLog.executor.getClass().getSimpleName(),
+ randomSize ? " random size" : "",
+ discardedRun ? " with discarded run" : "");
commitLog.allocator.enableReserveSegmentCreation();
-
+
final List<CommitlogExecutor> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
discardedPos = ReplayPosition.NONE;
- if (discardedRun) {
+ if (discardedRun)
+ {
// Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
Thread.sleep(runTimeMs / 3);
stop = true;
scheduled.shutdown();
scheduled.awaitTermination(2, TimeUnit.SECONDS);
- for (CommitlogExecutor t: threads)
+ for (CommitlogExecutor t : threads)
{
t.join();
if (t.rp.compareTo(discardedPos) > 0)
@@ -234,15 +243,15 @@ public class CommitLogStressTest
}
verifySizes(commitLog);
- commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
+ commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
+ discardedPos);
threads.clear();
System.out.format("Discarded at %s\n", discardedPos);
verifySizes(commitLog);
-
+
scheduled = startThreads(commitLog, threads);
}
-
Thread.sleep(runTimeMs);
stop = true;
scheduled.shutdown();
@@ -250,16 +259,18 @@ public class CommitLogStressTest
int hash = 0;
int cells = 0;
- for (CommitlogExecutor t: threads) {
+ for (CommitlogExecutor t : threads)
+ {
t.join();
hash += t.hash;
cells += t.cells;
}
verifySizes(commitLog);
-
+
commitLog.shutdownBlocking();
- System.out.print("Stopped. Replaying... "); System.out.flush();
+ System.out.print("Stopped. Replaying... ");
+ System.out.flush();
Replayer repl = new Replayer();
File[] files = new File(location).listFiles();
repl.recover(files);
@@ -267,12 +278,16 @@ public class CommitLogStressTest
for (File f : files)
if (!f.delete())
Assert.fail("Failed to delete " + f);
-
+
if (hash == repl.hash && cells == repl.cells)
System.out.println("Test success.");
else
{
- System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash);
+ System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
+ repl.cells,
+ cells,
+ repl.hash,
+ hash);
failed = true;
}
}
@@ -287,7 +302,7 @@ public class CommitLogStressTest
commitLog.executor.requestExtraSync().awaitUninterruptibly();
// Wait for any pending deletes or segment allocations to complete.
commitLog.allocator.awaitManagementTasksCompletion();
-
+
long combinedSize = 0;
for (File f : new File(commitLog.location).listFiles())
combinedSize += f.length();
@@ -297,11 +312,11 @@ public class CommitLogStressTest
Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments();
- for (CommitLogSegment segment: segments)
+ for (CommitLogSegment segment : segments)
{
Assert.assertTrue(logFileNames.remove(segment.getName()));
Double ratio = ratios.remove(segment.getName());
-
+
Assert.assertEquals(segment.logFile.length(), segment.onDiskSize());
Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio, 0.01);
}
@@ -312,35 +327,47 @@ public class CommitLogStressTest
public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads)
{
stop = false;
- for (int ii = 0; ii < NUM_THREADS; ii++) {
+ for (int ii = 0; ii < NUM_THREADS; ii++)
+ {
final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii));
threads.add(t);
t.start();
}
final long start = System.currentTimeMillis();
- Runnable printRunnable = new Runnable() {
+ Runnable printRunnable = new Runnable()
+ {
long lastUpdate = 0;
- public void run() {
- Runtime runtime = Runtime.getRuntime();
- long maxMemory = runtime.maxMemory();
- long allocatedMemory = runtime.totalMemory();
- long freeMemory = runtime.freeMemory();
- long temp = 0;
- long sz = 0;
- for (CommitlogExecutor cle : threads) {
- temp += cle.counter.get();
- sz += cle.dataSize;
- }
- double time = (System.currentTimeMillis() - start) / 1000.0;
- double avg = (temp / time);
- System.out.println(
- String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
- ((System.currentTimeMillis() - start) / 1000),
- mb(maxMemory), mb(allocatedMemory), mb(freeMemory), (temp - lastUpdate), lastUpdate, avg,
- mb(commitLog.getActiveContentSize()), mb(commitLog.getActiveOnDiskSize()), mb(sz / time)));
- lastUpdate = temp;
+ public void run()
+ {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = runtime.maxMemory();
+ long allocatedMemory = runtime.totalMemory();
+ long freeMemory = runtime.freeMemory();
+ long temp = 0;
+ long sz = 0;
+ for (CommitlogExecutor cle : threads)
+ {
+ temp += cle.counter.get();
+ sz += cle.dataSize;
+ }
+ double time = (System.currentTimeMillis() - start) / 1000.0;
+ double avg = (temp / time);
+ System.out
+ .println(
+ String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
+ ((System.currentTimeMillis() - start) / 1000),
+ mb(maxMemory),
+ mb(allocatedMemory),
+ mb(freeMemory),
+ (temp - lastUpdate),
+ lastUpdate,
+ avg,
+ mb(commitLog.getActiveContentSize()),
+ mb(commitLog.getActiveOnDiskSize()),
+ mb(sz / time)));
+ lastUpdate = temp;
}
};
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
@@ -348,15 +375,18 @@ public class CommitLogStressTest
return scheduled;
}
- private static double mb(long maxMemory) {
+ private static double mb(long maxMemory)
+ {
return maxMemory / (1024.0 * 1024);
}
- private static double mb(double maxMemory) {
+ private static double mb(double maxMemory)
+ {
return maxMemory / (1024 * 1024);
}
- public static ByteBuffer randomBytes(int quantity, Random tlr) {
+ public static ByteBuffer randomBytes(int quantity, Random tlr)
+ {
ByteBuffer slice = ByteBuffer.allocate(quantity);
ByteBuffer source = dataSource.duplicate();
source.position(tlr.nextInt(source.capacity() - quantity));
@@ -366,7 +396,8 @@ public class CommitLogStressTest
return slice;
}
- public class CommitlogExecutor extends Thread {
+ public class CommitlogExecutor extends Thread
+ {
final AtomicLong counter = new AtomicLong();
int hash = 0;
int cells = 0;
@@ -382,21 +413,23 @@ public class CommitLogStressTest
this.random = rand;
}
- public void run() {
+ public void run()
+ {
RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
final Random rand = random != null ? random : ThreadLocalRandom.current();
- while (!stop) {
+ while (!stop)
+ {
if (rl != null)
rl.acquire();
String ks = "Keyspace1";
ByteBuffer key = randomBytes(16, rand);
Mutation mutation = new Mutation(ks, key);
- for (int ii = 0; ii < numCells; ii++) {
+ for (int ii = 0; ii < numCells; ii++)
+ {
int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
ByteBuffer bytes = randomBytes(sz, rand);
- mutation.add("Standard1", Util.cellname("name" + ii), bytes,
- System.currentTimeMillis());
+ mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis());
hash = hash(hash, bytes);
++cells;
dataSize += sz;
@@ -406,7 +439,7 @@ public class CommitLogStressTest
}
}
}
-
+
class Replayer extends CommitLogReplayer
{
Replayer()
@@ -420,20 +453,22 @@ public class CommitLogStressTest
@Override
void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
{
- if (desc.id < discardedPos.segment) {
+ if (desc.id < discardedPos.segment)
+ {
System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
return;
- } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+ }
+ else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
// Skip over this mutation.
return;
-
+
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
Mutation mutation;
try
{
mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
- desc.getMessagingVersion(),
- ColumnSerializer.Flag.LOCAL);
+ desc.getMessagingVersion(),
+ ColumnSerializer.Flag.LOCAL);
}
catch (IOException e)
{
@@ -441,8 +476,10 @@ public class CommitLogStressTest
throw new AssertionError(e);
}
- for (ColumnFamily cf : mutation.getColumnFamilies()) {
- for (Cell c : cf.getSortedColumns()) {
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ {
+ for (Cell c : cf.getSortedColumns())
+ {
if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
{
hash = hash(hash, c.value());
@@ -451,6 +488,6 @@ public class CommitLogStressTest
}
}
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
new file mode 100644
index 0000000..1655078
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -0,0 +1,143 @@
+package org.apache.cassandra.db.commitlog;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.UUID;
+
+import junit.framework.Assert;
+
+import com.google.common.base.Predicate;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Mutation;
+
+public class CommitLogUpgradeTest
+{
+ static final String DATA_DIR = "test/data/legacy-commitlog/";
+ static final String PROPERTIES_FILE = "hash.txt";
+ static final String CFID_PROPERTY = "cfid";
+ static final String CELLS_PROPERTY = "cells";
+ static final String HASH_PROPERTY = "hash";
+
+ static final String TABLE = "Standard1";
+ static final String KEYSPACE = "Keyspace1";
+ static final String CELLNAME = "name";
+
+ @Test
+ public void test20() throws Exception
+ {
+ testRestore(DATA_DIR + "2.0");
+ }
+
+ @Test
+ public void test21() throws Exception
+ {
+ testRestore(DATA_DIR + "2.1");
+ }
+
+ @BeforeClass
+ static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+ {
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition("");
+ }
+
+ public void testRestore(String location) throws IOException, InterruptedException
+ {
+ Properties prop = new Properties();
+ prop.load(new FileInputStream(new File(location + File.separatorChar + PROPERTIES_FILE)));
+ int hash = Integer.parseInt(prop.getProperty(HASH_PROPERTY));
+ int cells = Integer.parseInt(prop.getProperty(CELLS_PROPERTY));
+
+ String cfidString = prop.getProperty(CFID_PROPERTY);
+ if (cfidString != null)
+ {
+ UUID cfid = UUID.fromString(cfidString);
+ if (Schema.instance.getCF(cfid) == null)
+ {
+ CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ Schema.instance.purge(cfm);
+ Schema.instance.load(cfm.copy(cfid));
+ }
+ }
+
+ Hasher hasher = new Hasher();
+ CommitLogTestReplayer replayer = new CommitLogTestReplayer(hasher);
+ File[] files = new File(location).listFiles(new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith(".log");
+ }
+ });
+ replayer.recover(files);
+
+ Assert.assertEquals(cells, hasher.cells);
+ Assert.assertEquals(hash, hasher.hash);
+ }
+
+ public static int hash(int hash, ByteBuffer bytes)
+ {
+ int shift = 0;
+ for (int i = 0; i < bytes.limit(); i++)
+ {
+ hash += (bytes.get(i) & 0xFF) << shift;
+ shift = (shift + 8) & 0x1F;
+ }
+ return hash;
+ }
+
+ class Hasher implements Predicate<Mutation>
+ {
+ int hash = 0;
+ int cells = 0;
+
+ @Override
+ public boolean apply(Mutation mutation)
+ {
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ {
+ for (Cell c : cf.getSortedColumns())
+ {
+ if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith(CELLNAME))
+ {
+ hash = hash(hash, c.value());
+ ++cells;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a384faaa/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
new file mode 100644
index 0000000..7b07c8e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -0,0 +1,250 @@
+package org.apache.cassandra.db.commitlog;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.commitlog.CommitLogUpgradeTest.*;
+
+public class CommitLogUpgradeTestMaker
+{
+ public static ByteBuffer dataSource;
+
+ private static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
+ public static int numCells = 1;
+ public static int cellSize = 256;
+ public static int rateLimit = 0;
+ public static int runTimeMs = 1000;
+
+ public static void main(String[] args) throws Exception
+ {
+ try
+ {
+ initialize();
+
+ CommitLogUpgradeTestMaker tester = new CommitLogUpgradeTestMaker();
+ tester.makeLog();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(System.err);
+ }
+ finally
+ {
+ System.exit(0);
+ }
+ }
+
+ volatile boolean stop = false;
+ boolean randomSize = true;
+
+ static public void initialize() throws IOException, ConfigurationException
+ {
+ try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+ {
+ dataSource = ByteBuffer.allocateDirect((int) fis.getChannel().size());
+ while (dataSource.hasRemaining())
+ {
+ fis.getChannel().read(dataSource);
+ }
+ dataSource.flip();
+ }
+
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition("");
+ }
+
+ public void makeLog() throws IOException, InterruptedException
+ {
+ CommitLog commitLog = CommitLog.instance;
+ System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
+ mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+ commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+ commitLog.executor.getClass().getSimpleName(),
+ randomSize ? " random size" : "");
+ final List<CommitlogExecutor> threads = new ArrayList<>();
+ ScheduledExecutorService scheduled = startThreads(commitLog, threads);
+
+ Thread.sleep(runTimeMs);
+ stop = true;
+ scheduled.shutdown();
+ scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+ int hash = 0;
+ int cells = 0;
+ for (CommitlogExecutor t : threads)
+ {
+ t.join();
+ hash += t.hash;
+ cells += t.cells;
+ }
+ commitLog.shutdownBlocking();
+
+ File dataDir = new File(CommitLogUpgradeTest.DATA_DIR + FBUtilities.getReleaseVersionString());
+ System.out.format("Data will be stored in %s\n", dataDir);
+ if (dataDir.exists())
+ FileUtils.deleteRecursive(dataDir);
+
+ dataDir.mkdirs();
+ for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ FileUtils.createHardLink(f, new File(dataDir, f.getName()));
+
+ Properties prop = new Properties();
+ prop.setProperty(CFID_PROPERTY, Schema.instance.getId(KEYSPACE, TABLE).toString());
+ prop.setProperty(CELLS_PROPERTY, Integer.toString(cells));
+ prop.setProperty(HASH_PROPERTY, Integer.toString(hash));
+ prop.store(new FileOutputStream(new File(dataDir, PROPERTIES_FILE)),
+ "CommitLog upgrade test, version " + FBUtilities.getReleaseVersionString());
+ System.out.println("Done");
+ }
+
+ public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads)
+ {
+ stop = false;
+ for (int ii = 0; ii < NUM_THREADS; ii++)
+ {
+ final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+ threads.add(t);
+ t.start();
+ }
+
+ final long start = System.currentTimeMillis();
+ Runnable printRunnable = new Runnable()
+ {
+ long lastUpdate = 0;
+
+ public void run()
+ {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = mb(runtime.maxMemory());
+ long allocatedMemory = mb(runtime.totalMemory());
+ long freeMemory = mb(runtime.freeMemory());
+ long temp = 0;
+ long sz = 0;
+ for (CommitlogExecutor cle : threads)
+ {
+ temp += cle.counter.get();
+ sz += cle.dataSize;
+ }
+ double time = (System.currentTimeMillis() - start) / 1000.0;
+ double avg = (temp / time);
+ System.out.println(
+ String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
+ ((System.currentTimeMillis() - start) / 1000),
+ maxMemory,
+ allocatedMemory,
+ freeMemory,
+ (temp - lastUpdate),
+ lastUpdate,
+ avg,
+ mb(sz / time)));
+ lastUpdate = temp;
+ }
+ };
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
+ return scheduled;
+ }
+
+ private static long mb(long maxMemory)
+ {
+ return maxMemory / (1024 * 1024);
+ }
+
+ private static double mb(double maxMemory)
+ {
+ return maxMemory / (1024 * 1024);
+ }
+
+ public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr)
+ {
+ ByteBuffer slice = ByteBuffer.allocate(quantity);
+ ByteBuffer source = dataSource.duplicate();
+ source.position(tlr.nextInt(source.capacity() - quantity));
+ source.limit(source.position() + quantity);
+ slice.put(source);
+ slice.flip();
+ return slice;
+ }
+
+ public class CommitlogExecutor extends Thread
+ {
+ final AtomicLong counter = new AtomicLong();
+ int hash = 0;
+ int cells = 0;
+ int dataSize = 0;
+ final CommitLog commitLog;
+
+ volatile ReplayPosition rp;
+
+ public CommitlogExecutor(CommitLog commitLog)
+ {
+ this.commitLog = commitLog;
+ }
+
+ public void run()
+ {
+ RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
+ final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+ while (!stop)
+ {
+ if (rl != null)
+ rl.acquire();
+ String ks = KEYSPACE;
+ ByteBuffer key = randomBytes(16, tlr);
+ Mutation mutation = new Mutation(ks, key);
+
+ for (int ii = 0; ii < numCells; ii++)
+ {
+ int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
+ ByteBuffer bytes = randomBytes(sz, tlr);
+ mutation.add(TABLE, Util.cellname(CELLNAME + ii), bytes, System.currentTimeMillis());
+ hash = hash(hash, bytes);
+ ++cells;
+ dataSize += sz;
+ }
+ rp = commitLog.add(mutation);
+ counter.incrementAndGet();
+ }
+ }
+ }
+}