You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/08/19 03:16:58 UTC
[7/8] cassandra git commit: Honors commit log policy when replaying;
treats errors in the last segment (section for compressed files) as
permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749
Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush.
Reviewed by aweisberg for CASSANDRA-9749
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12d2d49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12d2d49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12d2d49
Branch: refs/heads/cassandra-3.0
Commit: d12d2d496540c698f30e9b528b66e8f6636842d3
Parents: 7eed4b6
Author: Branimir Lambov <br...@datastax.com>
Authored: Thu Jul 30 20:59:16 2015 +0300
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Aug 18 20:16:31 2015 -0500
----------------------------------------------------------------------
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogReplayer.java | 117 +++-
.../db/commitlog/CommitLogSegmentManager.java | 8 +-
.../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes
.../legacy-commitlog/2.2-lz4-bitrot/hash.txt | 6 +
.../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes
.../legacy-commitlog/2.2-lz4-bitrot2/hash.txt | 6 +
.../CommitLog-5-1438186885380.log | Bin 0 -> 839001 bytes
.../legacy-commitlog/2.2-lz4-truncated/hash.txt | 5 +
.../db/CommitLogFailurePolicyTest.java | 141 -----
.../org/apache/cassandra/db/CommitLogTest.java | 467 ---------------
.../commitlog/CommitLogFailurePolicyTest.java | 141 +++++
.../cassandra/db/commitlog/CommitLogTest.java | 578 +++++++++++++++++++
.../db/commitlog/CommitLogUpgradeTest.java | 65 +++
14 files changed, 899 insertions(+), 639 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 8d74677..ff27225 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -193,7 +193,9 @@ public class CommitLog implements CommitLogMBean
*/
public void recover(String path) throws IOException
{
- recover(new File(path));
+ CommitLogReplayer recovery = CommitLogReplayer.construct(this);
+ recovery.recover(new File(path), false);
+ recovery.blockForWrites();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 358661a..93c3026 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -64,6 +64,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
public class CommitLogReplayer
{
+ static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
@@ -144,14 +145,15 @@ public class CommitLogReplayer
public void recover(File[] clogs) throws IOException
{
- for (final File file : clogs)
- recover(file);
+ int i;
+ for (i = 0; i < clogs.length; ++i)
+ recover(clogs[i], i + 1 == clogs.length);
}
public int blockForWrites()
{
for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
- logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
+ logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
// wait for all the writes to finish on the mutation stage
FBUtilities.waitOnFutures(futures);
@@ -165,7 +167,7 @@ public class CommitLogReplayer
return replayedCount.get();
}
- private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
{
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
@@ -183,13 +185,17 @@ public class CommitLogReplayer
{
if (end != 0 || filecrc != 0)
{
- logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+ handleReplayError(false,
+ "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+ "The end of segment marker should be zero.",
+ offset, reader.getPath());
}
return -1;
}
else if (end < offset || end > reader.length())
{
- logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+ handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
+ offset, reader.getPath());
return -1;
}
return end;
@@ -270,8 +276,7 @@ public class CommitLogReplayer
}
}
- @SuppressWarnings("resource")
- public void recover(File file) throws IOException
+ public void recover(File file, boolean tolerateTruncation) throws IOException
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
@@ -283,7 +288,7 @@ public class CommitLogReplayer
return;
if (globalPosition.segment == desc.id)
reader.seek(globalPosition.position);
- replaySyncSection(reader, (int) reader.getPositionLimit(), desc);
+ replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation);
return;
}
@@ -297,10 +302,15 @@ public class CommitLogReplayer
desc = null;
}
if (desc == null) {
- logger.warn("Could not read commit log descriptor in file {}", file);
+ handleReplayError(false, "Could not read commit log descriptor in file %s", file);
return;
}
- assert segmentId == desc.id;
+ if (segmentId != desc.id)
+ {
+ handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
+ // continue processing if ignored.
+ }
+
if (logAndCheckIfShouldSkip(file, desc))
return;
@@ -313,7 +323,7 @@ public class CommitLogReplayer
}
catch (ConfigurationException e)
{
- logger.warn("Unknown compression: {}", e.getMessage());
+ handleReplayError(false, "Unknown compression: %s", e.getMessage());
return;
}
}
@@ -322,7 +332,7 @@ public class CommitLogReplayer
int end = (int) reader.getFilePointer();
int replayEnd = end;
- while ((end = readSyncMarker(desc, end, reader)) >= 0)
+ while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
{
int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
@@ -343,11 +353,17 @@ public class CommitLogReplayer
continue;
FileDataInput sectionReader = reader;
+ String errorContext = desc.fileName();
+ // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+ boolean tolerateErrorsInSection = tolerateTruncation;
if (compressor != null)
{
+ // In the compressed case we know if this is the last section.
+ tolerateErrorsInSection &= end == reader.length() || end < 0;
+
+ int start = (int) reader.getFilePointer();
try
{
- int start = (int) reader.getFilePointer();
int compressedLength = end - start;
if (logger.isDebugEnabled())
logger.trace("Decompressing {} between replay positions {} and {}",
@@ -362,15 +378,18 @@ public class CommitLogReplayer
uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
+ errorContext = "compressed section at " + start + " in " + errorContext;
}
- catch (IOException e)
+ catch (IOException | ArrayIndexOutOfBoundsException e)
{
- logger.error("Unexpected exception decompressing section {}", e);
+ handleReplayError(tolerateErrorsInSection,
+ "Unexpected exception decompressing section at %d: %s",
+ start, e);
continue;
}
}
- if (!replaySyncSection(sectionReader, replayEnd, desc))
+ if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
break;
}
}
@@ -402,13 +421,14 @@ public class CommitLogReplayer
*
* @return Whether replay should continue with the next section.
*/
- private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException
+ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
{
/* read the logs populate Mutation and apply */
while (reader.getFilePointer() < end && !reader.isEOF())
{
+ long mutationStart = reader.getFilePointer();
if (logger.isDebugEnabled())
- logger.trace("Reading mutation at {}", reader.getFilePointer());
+ logger.trace("Reading mutation at {}", mutationStart);
long claimedCRC32;
int serializedSize;
@@ -427,7 +447,12 @@ public class CommitLogReplayer
// 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
+ {
+ handleReplayError(tolerateErrors,
+ "Invalid mutation size %d at %d in %s",
+ serializedSize, mutationStart, errorContext);
return false;
+ }
long claimedSizeChecksum;
if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -441,7 +466,12 @@ public class CommitLogReplayer
updateChecksumInt(checksum, serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
+ {
+ handleReplayError(tolerateErrors,
+ "Mutation size checksum failure at %d in %s",
+ mutationStart, errorContext);
return false;
+ }
// ok.
if (serializedSize > buffer.length)
@@ -454,14 +484,18 @@ public class CommitLogReplayer
}
catch (EOFException eof)
{
+ handleReplayError(tolerateErrors,
+ "Unexpected end of segment",
+ mutationStart, errorContext);
return false; // last CL entry didn't get completely written. that's ok.
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ handleReplayError(tolerateErrors,
+ "Mutation checksum failure at %d in %s",
+ mutationStart, errorContext);
continue;
}
replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
@@ -510,9 +544,13 @@ public class CommitLogReplayer
out.write(inputBuffer, 0, size);
}
- String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
- f.getAbsolutePath());
- logger.error(st, t);
+ // Checksum passed so this error can't be permissible.
+ handleReplayError(false,
+ "Unexpected error deserializing mutation; saved to %s. " +
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
+ "Exception follows: %s",
+ f.getAbsolutePath(),
+ t);
return;
}
@@ -580,4 +618,35 @@ public class CommitLogReplayer
}
return false;
}
+
+ static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
+ {
+ String msg = String.format(message, messageArgs);
+ IOException e = new CommitLogReplayException(msg);
+ if (permissible)
+ logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
+ else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
+ logger.error("Ignoring commit log replay error", e);
+ else if (!CommitLog.handleCommitError("Failed commit log replay", e))
+ {
+ logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
+ "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
+ "on the command line");
+ throw e;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class CommitLogReplayException extends IOException
+ {
+ public CommitLogReplayException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public CommitLogReplayException(String message)
+ {
+ super(message);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 19b850f..9849350 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -34,10 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.*;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,14 +94,12 @@ public class CommitLogSegmentManager
private volatile boolean run = true;
private final CommitLog commitLog;
- @VisibleForTesting
- public CommitLogSegmentManager(final CommitLog commitLog)
+ CommitLogSegmentManager(final CommitLog commitLog)
{
this.commitLog = commitLog;
}
- @VisibleForTesting
- public void start()
+ void start()
{
// The run loop for the manager thread
Runnable runnable = new WrappedRunnable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..d248d59
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
new file mode 100644
index 0000000..c4d8fe7
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog bitrot test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6051
+hash=-170208326
+cfid=dc32ce20-360d-11e5-826c-afadad37221d
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..083d65c
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
new file mode 100644
index 0000000..c49dda0
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog upgrade test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6037
+hash=-1312748407
+cfid=dc32ce20-360d-11e5-826c-afadad37221d
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..939d408
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
new file mode 100644
index 0000000..ce7f600
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
@@ -0,0 +1,5 @@
+#Truncated CommitLog test.
+#This is a copy of 2.2-lz4 with the last 50 bytes deleted.
+cells=6037
+hash=-889057729
+cfid=dc32ce20-360d-11e5-826c-afadad37221d
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
deleted file mode 100644
index 0ecab3c..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.db;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogFailurePolicyTest
-{
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.prepareServer();
- System.setProperty("cassandra.commitlog.stop_on_errors", "true");
- }
-
- @Test
- public void testCommitFailurePolicy_stop() throws ConfigurationException
- {
- CassandraDaemon daemon = new CassandraDaemon();
- daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
- StorageService.instance.registerDaemon(daemon);
-
- // Need storage service active so stop policy can shutdown gossip
- StorageService.instance.initServer();
- Assert.assertTrue(Gossiper.instance.isEnabled());
-
- Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
- try
- {
- DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
- CommitLog.handleCommitError("Test stop error", new Throwable());
- Assert.assertFalse(Gossiper.instance.isEnabled());
- }
- finally
- {
- DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
- }
- }
-
- @Test
- public void testCommitFailurePolicy_die()
- {
- CassandraDaemon daemon = new CassandraDaemon();
- daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
- StorageService.instance.registerDaemon(daemon);
-
- KillerForTests killerForTests = new KillerForTests();
- JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
- Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
- try
- {
- DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
- CommitLog.handleCommitError("Testing die policy", new Throwable());
- Assert.assertTrue(killerForTests.wasKilled());
- Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
- }
- finally
- {
- DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
- JVMStabilityInspector.replaceKiller(originalKiller);
- }
- }
-
- @Test
- public void testCommitFailurePolicy_ignore_beforeStartup()
- {
- //startup was not completed successfuly (since method completeSetup() was not called)
- CassandraDaemon daemon = new CassandraDaemon();
- StorageService.instance.registerDaemon(daemon);
-
- KillerForTests killerForTests = new KillerForTests();
- JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
- Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
- try
- {
- DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
- CommitLog.handleCommitError("Testing ignore policy", new Throwable());
- //even though policy is ignore, JVM must die because Daemon has not finished initializing
- Assert.assertTrue(killerForTests.wasKilled());
- Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
- }
- finally
- {
- DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
- JVMStabilityInspector.replaceKiller(originalKiller);
- }
- }
-
- @Test
- public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
- {
- CassandraDaemon daemon = new CassandraDaemon();
- daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
- StorageService.instance.registerDaemon(daemon);
-
- KillerForTests killerForTests = new KillerForTests();
- JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
- Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
- try
- {
- DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
- CommitLog.handleCommitError("Testing ignore policy", new Throwable());
- //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
- Assert.assertFalse(killerForTests.wasKilled());
- }
- finally
- {
- DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
- JVMStabilityInspector.replaceKiller(originalKiller);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
deleted file mode 100644
index 6db29a8..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.db;
-
-import static junit.framework.Assert.assertTrue;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.vint.VIntCoding;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogTest
-{
- private static final Logger logger = LoggerFactory.getLogger(CommitLogTest.class);
-
- private static final String KEYSPACE1 = "CommitLogTest";
- private static final String KEYSPACE2 = "CommitLogTestNonDurable";
- private static final String STANDARD1 = "Standard1";
- private static final String STANDARD2 = "Standard2";
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
- SchemaLoader.createKeyspace(KEYSPACE2,
- KeyspaceParams.simpleTransient(1),
- SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
- SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
- System.setProperty("cassandra.commitlog.stop_on_errors", "true");
- CompactionManager.instance.disableAutoCompaction();
- }
-
- @Test
- public void testRecoveryWithEmptyLog() throws Exception
- {
- CommitLog.instance.recover(new File[]{ tmpFile() });
- }
-
- @Test
- public void testRecoveryWithShortLog() throws Exception
- {
- // force EOF while reading log
- testRecoveryWithBadSizeArgument(100, 10);
- }
-
- @Test
- public void testRecoveryWithShortSize() throws Exception
- {
- testRecovery(new byte[2]);
- }
-
- @Test
- public void testRecoveryWithShortCheckSum() throws Exception
- {
- testRecovery(new byte[6]);
- }
-
- @Test
- public void testRecoveryWithGarbageLog() throws Exception
- {
- byte[] garbage = new byte[100];
- (new java.util.Random()).nextBytes(garbage);
- testRecovery(garbage);
- }
-
- @Test
- public void testRecoveryWithBadSizeChecksum() throws Exception
- {
- Checksum checksum = new CRC32();
- checksum.update(100);
- testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
- }
-
- @Test
- public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
- {
- // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
- testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
- }
-
- @Test
- public void testRecoveryWithNegativeSizeArgument() throws Exception
- {
- // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
- testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
- }
-
- @Test
- public void testDontDeleteIfDirty() throws Exception
- {
- CommitLog.instance.resetUnsafe(true);
- ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
- // Roughly 32 MB mutation
- Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
- .build();
-
- // Adding it 5 times
- CommitLog.instance.add(m);
- CommitLog.instance.add(m);
- CommitLog.instance.add(m);
- CommitLog.instance.add(m);
- CommitLog.instance.add(m);
-
- // Adding new mutation on another CF
- Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(4))
- .build();
- CommitLog.instance.add(m2);
-
- assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
-
- UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
- CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
- // Assert we still have both our segment
- assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
- }
-
- @Test
- public void testDeleteIfNotDirty() throws Exception
- {
- DatabaseDescriptor.getCommitLogSegmentSize();
- CommitLog.instance.resetUnsafe(true);
- ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
- // Roughly 32 MB mutation
- Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
- .build();
-
- // Adding it twice (won't change segment)
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
-
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
- // "Flush": this won't delete anything
- UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync(true);
- CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
-
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
- // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
- Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
- .build();
- CommitLog.instance.add(rm2);
- // also forces a new segment, since each entry-with-overhead is just under half the CL size
- CommitLog.instance.add(rm2);
- CommitLog.instance.add(rm2);
-
- assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
-
-
- // "Flush" second cf: The first segment should be deleted since we
- // didn't write anything on cf1 since last flush (and we flush cf2)
-
- UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
- CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
- // Assert we still have both our segment
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
- }
-
- private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
- {
- ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName);
- // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would
- // break testEqualRecordLimit
- int allocSize = 1;
- Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key)
- .clustering(colName)
- .add("val", ByteBuffer.allocate(allocSize)).build();
-
- int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
- max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
-
- // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
- int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
- max -= mutationOverhead;
-
- // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
- int sizeOfMax = VIntCoding.computeVIntSize(max);
- max -= sizeOfMax;
- assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would
- return max;
- }
-
- private static int getMaxRecordDataSize()
- {
- return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes");
- }
-
- // CASSANDRA-3615
- @Test
- public void testEqualRecordLimit() throws Exception
- {
- CommitLog.instance.resetUnsafe(true);
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
- .build();
- CommitLog.instance.add(rm);
- }
-
- @Test
- public void testExceedRecordLimit() throws Exception
- {
- CommitLog.instance.resetUnsafe(true);
- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- try
- {
- Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
- .build();
- CommitLog.instance.add(rm);
- throw new AssertionError("mutation larger than limit was accepted");
- }
- catch (IllegalArgumentException e)
- {
- // IAE is thrown on too-large mutations
- }
- }
-
- @Test
- public void testVersions()
- {
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
- assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
- assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
- String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
- assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
- }
-
- @Test
- public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
- {
- boolean originalState = DatabaseDescriptor.isAutoSnapshot();
- try
- {
- CommitLog.instance.resetUnsafe(true);
- boolean prev = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
- new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
- cfs1.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prev);
- Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
- .build();
-
- for (int i = 0 ; i < 5 ; i++)
- CommitLog.instance.add(m2);
-
- assertEquals(2, CommitLog.instance.activeSegments());
- ReplayPosition position = CommitLog.instance.getContext();
- for (Keyspace ks : Keyspace.system())
- for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
- CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
- assertEquals(1, CommitLog.instance.activeSegments());
- }
- finally
- {
- DatabaseDescriptor.setAutoSnapshot(originalState);
- }
- }
-
- @Test
- public void testTruncateWithoutSnapshotNonDurable() throws IOException
- {
- CommitLog.instance.resetUnsafe(true);
- boolean originalState = DatabaseDescriptor.getAutoSnapshot();
- try
- {
- DatabaseDescriptor.setAutoSnapshot(false);
- Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
- Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
-
- ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
- new RowUpdateBuilder(cfs.metadata, 0, "key1")
- .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
- .build()
- .applyUnsafe();
-
- assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
- .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
-
- cfs.truncateBlocking();
-
- Util.assertEmpty(Util.cmd(cfs).columns("val").build());
- }
- finally
- {
- DatabaseDescriptor.setAutoSnapshot(originalState);
- }
- }
-
- private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
- {
- ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc);
- long length = buf.position();
- // Put some extra data in the stream.
- buf.putDouble(0.1);
- buf.flip();
- FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
- CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- Assert.assertEquals("Descriptor length", length, input.getFilePointer());
- Assert.assertEquals("Descriptors", desc, read);
- }
-
- @Test
- public void testDescriptorPersistence() throws IOException
- {
- testDescriptorPersistence(new CommitLogDescriptor(11, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
- new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
- }
-
- @Test
- public void testDescriptorInvalidParametersSize() throws IOException
- {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<65535; ++i)
- params.put("key"+i, Integer.toString(i, 16));
- try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
- 21,
- new ParameterizedClass("LZ4Compressor", params));
- ByteBuffer buf = ByteBuffer.allocate(1024000);
- CommitLogDescriptor.writeHeader(buf, desc);
- Assert.fail("Parameter object too long should fail on writing descriptor.");
- } catch (ConfigurationException e)
- {
- // correct path
- }
- }
-
- protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
- {
- Checksum checksum = new CRC32();
- checksum.update(size);
- testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
- }
-
- protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
- {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(out);
- dout.writeInt(size);
- dout.writeLong(checksum);
- dout.write(new byte[dataSize]);
- dout.close();
- testRecovery(out.toByteArray());
- }
-
- protected File tmpFile() throws IOException
- {
- File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log");
- logFile.deleteOnExit();
- assert logFile.length() == 0;
- return logFile;
- }
-
- protected void testRecovery(byte[] logData) throws Exception
- {
- File logFile = tmpFile();
- try (OutputStream lout = new FileOutputStream(logFile))
- {
- lout.write(logData);
- //statics make it annoying to test things correctly
- CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
new file mode 100644
index 0000000..79f83fe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.db.commitlog;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+public class CommitLogFailurePolicyTest
+{
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ System.setProperty("cassandra.commitlog.stop_on_errors", "true");
+ }
+
+ @Test
+ public void testCommitFailurePolicy_stop() throws ConfigurationException
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ // Need storage service active so stop policy can shutdown gossip
+ StorageService.instance.initServer();
+ Assert.assertTrue(Gossiper.instance.isEnabled());
+
+ Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ CommitLog.handleCommitError("Test stop error", new Throwable());
+ Assert.assertFalse(Gossiper.instance.isEnabled());
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ }
+ }
+
+ @Test
+ public void testCommitFailurePolicy_die()
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+ CommitLog.handleCommitError("Testing die policy", new Throwable());
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+
+ @Test
+ public void testCommitFailurePolicy_ignore_beforeStartup()
+ {
+ //startup was not completed successfuly (since method completeSetup() was not called)
+ CassandraDaemon daemon = new CassandraDaemon();
+ StorageService.instance.registerDaemon(daemon);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+ CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+ //even though policy is ignore, JVM must die because Daemon has not finished initializing
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+
+ @Test
+ public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
+ {
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+ CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+ //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
+ Assert.assertFalse(killerForTests.wasKilled());
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
new file mode 100644
index 0000000..b41b7b3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -0,0 +1,578 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+import static junit.framework.Assert.assertTrue;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+public class CommitLogTest
+{
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String KEYSPACE2 = "CommitLogTestNonDurable";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+ SchemaLoader.createKeyspace(KEYSPACE2,
+ KeyspaceParams.simpleTransient(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+ CompactionManager.instance.disableAutoCompaction();
+ }
+
+ @Test
+ public void testRecoveryWithEmptyLog() throws Exception
+ {
+ runExpecting(() -> {
+ CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) });
+ return null;
+ }, CommitLogReplayException.class);
+ }
+
+ @Test
+ public void testRecoveryWithEmptyLog20() throws Exception
+ {
+ CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) });
+ }
+
+ @Test
+ public void testRecoveryWithZeroLog() throws Exception
+ {
+ testRecovery(new byte[10], null);
+ }
+
+ @Test
+ public void testRecoveryWithShortLog() throws Exception
+ {
+ // force EOF while reading log
+ testRecoveryWithBadSizeArgument(100, 10);
+ }
+
+ @Test
+ public void testRecoveryWithShortSize() throws Exception
+ {
+ runExpecting(() -> {
+ testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+ return null;
+ }, CommitLogReplayException.class);
+ }
+
+ @Test
+ public void testRecoveryWithShortCheckSum() throws Exception
+ {
+ byte[] data = new byte[8];
+ data[3] = 10; // make sure this is not a legacy end marker.
+ testRecovery(data, CommitLogReplayException.class);
+ }
+
+ @Test
+ public void testRecoveryWithShortMutationSize() throws Exception
+ {
+ testRecoveryWithBadSizeArgument(9, 10);
+ }
+
+ private void testRecoveryWithGarbageLog() throws Exception
+ {
+ byte[] garbage = new byte[100];
+ (new java.util.Random()).nextBytes(garbage);
+ testRecovery(garbage, CommitLogDescriptor.current_version);
+ }
+
+ @Test
+ public void testRecoveryWithGarbageLog_fail() throws Exception
+ {
+ runExpecting(() -> {
+ testRecoveryWithGarbageLog();
+ return null;
+ }, CommitLogReplayException.class);
+ }
+
+ @Test
+ public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception
+ {
+ try {
+ System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+ testRecoveryWithGarbageLog();
+ } finally {
+ System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+ }
+ }
+
+ @Test
+ public void testRecoveryWithBadSizeChecksum() throws Exception
+ {
+ Checksum checksum = new CRC32();
+ checksum.update(100);
+ testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+ }
+
+ @Test
+ public void testRecoveryWithNegativeSizeArgument() throws Exception
+ {
+ // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
+ testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+ }
+
+ @Test
+ public void testDontDeleteIfDirty() throws Exception
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ // Roughly 32 MB mutation
+ Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
+ .build();
+
+ // Adding it 5 times
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+
+ // Adding new mutation on another CF
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(4))
+ .build();
+ CommitLog.instance.add(m2);
+
+ assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+
+ UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
+ CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+ // Assert we still have both our segment
+ assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+ }
+
+ @Test
+ public void testDeleteIfNotDirty() throws Exception
+ {
+ DatabaseDescriptor.getCommitLogSegmentSize();
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ // Roughly 32 MB mutation
+ Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
+ .build();
+
+ // Adding it twice (won't change segment)
+ CommitLog.instance.add(rm);
+ CommitLog.instance.add(rm);
+
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+ // "Flush": this won't delete anything
+ UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+ CommitLog.instance.sync(true);
+ CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
+
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+ // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
+ Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
+ .build();
+ CommitLog.instance.add(rm2);
+ // also forces a new segment, since each entry-with-overhead is just under half the CL size
+ CommitLog.instance.add(rm2);
+ CommitLog.instance.add(rm2);
+
+ assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
+
+
+ // "Flush" second cf: The first segment should be deleted since we
+ // didn't write anything on cf1 since last flush (and we flush cf2)
+
+ UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
+ CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+ // Assert we still have both our segment
+ assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+ }
+
+ private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName);
+ // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would
+ // break testEqualRecordLimit
+ int allocSize = 1;
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key)
+ .clustering(colName)
+ .add("val", ByteBuffer.allocate(allocSize)).build();
+
+ int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
+ max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
+
+ // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
+ int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
+ max -= mutationOverhead;
+
+ // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
+ int sizeOfMax = VIntCoding.computeVIntSize(max);
+ max -= sizeOfMax;
+ assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would
+ return max;
+ }
+
+ private static int getMaxRecordDataSize()
+ {
+ return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes");
+ }
+
+ // CASSANDRA-3615
+ @Test
+ public void testEqualRecordLimit() throws Exception
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
+ .build();
+ CommitLog.instance.add(rm);
+ }
+
+ @Test
+ public void testExceedRecordLimit() throws Exception
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ try
+ {
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+ .build();
+ CommitLog.instance.add(rm);
+ throw new AssertionError("mutation larger than limit was accepted");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // IAE is thrown on too-large mutations
+ }
+ }
+
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
+ {
+ Checksum checksum = new CRC32();
+ checksum.update(size);
+ testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+ }
+
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dout = new DataOutputStream(out);
+ dout.writeInt(size);
+ dout.writeLong(checksum);
+ dout.write(new byte[dataSize]);
+ dout.close();
+ testRecovery(out.toByteArray(), CommitLogReplayException.class);
+ }
+
+ protected File tmpFile(int version) throws IOException
+ {
+ File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
+ logFile.deleteOnExit();
+ assert logFile.length() == 0;
+ return logFile;
+ }
+
+ protected Void testRecovery(byte[] logData, int version) throws Exception
+ {
+ File logFile = tmpFile(version);
+ try (OutputStream lout = new FileOutputStream(logFile))
+ {
+ lout.write(logData);
+ //statics make it annoying to test things correctly
+ CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ }
+ return null;
+ }
+
+ protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception
+ {
+ File logFile = tmpFile(desc.version);
+ CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
+ // Change id to match file.
+ desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ try (OutputStream lout = new FileOutputStream(logFile))
+ {
+ lout.write(buf.array(), 0, buf.position());
+ lout.write(logData);
+ //statics make it annoying to test things correctly
+ CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ }
+ return null;
+ }
+
+ @Test
+ public void testRecoveryWithIdMismatch() throws Exception
+ {
+ CommitLogDescriptor desc = new CommitLogDescriptor(4, null);
+ File logFile = tmpFile(desc.version);
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ try (OutputStream lout = new FileOutputStream(logFile))
+ {
+ lout.write(buf.array(), 0, buf.position());
+
+ runExpecting(() -> {
+ CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ return null;
+ }, CommitLogReplayException.class);
+ }
+ }
+
+ @Test
+ public void testRecoveryWithBadCompressor() throws Exception
+ {
+ CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null));
+ runExpecting(() -> {
+ testRecovery(desc, new byte[0]);
+ return null;
+ }, CommitLogReplayException.class);
+ }
+
+ protected void runExpecting(Callable<Void> r, Class<?> expected)
+ {
+ JVMStabilityInspector.Killer originalKiller;
+ KillerForTests killerForTests;
+
+ killerForTests = new KillerForTests();
+ originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+
+ Throwable caught = null;
+ try
+ {
+ r.call();
+ }
+ catch (Throwable t)
+ {
+ if (expected != t.getClass())
+ throw new AssertionError("Expected exception " + expected + ", got " + t, t);
+ caught = t;
+ }
+ if (expected != null && caught == null)
+ Assert.fail("Expected exception " + expected + " but call completed successfully.");
+
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ assertEquals("JVM killed", expected != null, killerForTests.wasKilled());
+ }
+
+ protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
+ {
+ runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
+ runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
+ }
+
+ @Test
+ public void testVersions()
+ {
+ Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+ Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+ assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+ assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+ String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+ assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+ }
+
+ @Test
+ public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
+ {
+ boolean originalState = DatabaseDescriptor.isAutoSnapshot();
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ boolean prev = DatabaseDescriptor.isAutoSnapshot();
+ DatabaseDescriptor.setAutoSnapshot(false);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
+ cfs1.truncateBlocking();
+ DatabaseDescriptor.setAutoSnapshot(prev);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
+ .build();
+
+ for (int i = 0 ; i < 5 ; i++)
+ CommitLog.instance.add(m2);
+
+ assertEquals(2, CommitLog.instance.activeSegments());
+ ReplayPosition position = CommitLog.instance.getContext();
+ for (Keyspace ks : Keyspace.system())
+ for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+ CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+ assertEquals(1, CommitLog.instance.activeSegments());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
+ }
+
+ @Test
+ public void testTruncateWithoutSnapshotNonDurable() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ boolean originalState = DatabaseDescriptor.getAutoSnapshot();
+ try
+ {
+ DatabaseDescriptor.setAutoSnapshot(false);
+ Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
+ Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+
+ ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
+ new RowUpdateBuilder(cfs.metadata, 0, "key1")
+ .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .applyUnsafe();
+
+ assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
+ .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+
+ cfs.truncateBlocking();
+
+ Util.assertEmpty(Util.cmd(cfs).columns("val").build());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
+ }
+
+ private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+ {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ long length = buf.position();
+ // Put some extra data in the stream.
+ buf.putDouble(0.1);
+ buf.flip();
+ FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+ CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+ Assert.assertEquals("Descriptor length", length, input.getFilePointer());
+ Assert.assertEquals("Descriptors", desc, read);
+ }
+
+ @Test
+ public void testDescriptorPersistence() throws IOException
+ {
+ testDescriptorPersistence(new CommitLogDescriptor(11, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+ new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+ }
+
+ @Test
+ public void testDescriptorInvalidParametersSize() throws IOException
+ {
+ Map<String, String> params = new HashMap<>();
+ for (int i=0; i<65535; ++i)
+ params.put("key"+i, Integer.toString(i, 16));
+ try {
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+ 21,
+ new ParameterizedClass("LZ4Compressor", params));
+ ByteBuffer buf = ByteBuffer.allocate(1024000);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ Assert.fail("Parameter object too long should fail on writing descriptor.");
+ } catch (ConfigurationException e)
+ {
+ // correct path
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index 7b0ab06..00a143b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -30,6 +30,8 @@ import junit.framework.Assert;
import com.google.common.base.Predicate;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -43,6 +45,9 @@ import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
public class CommitLogUpgradeTest
{
@@ -56,6 +61,24 @@ public class CommitLogUpgradeTest
static final String KEYSPACE = "Keyspace1";
static final String CELLNAME = "name";
+ private JVMStabilityInspector.Killer originalKiller;
+ private KillerForTests killerForTests;
+ private boolean shouldBeKilled = false;
+
+ @Before
+ public void prepareToBeKilled()
+ {
+ killerForTests = new KillerForTests();
+ originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ }
+
+ @After
+ public void cleanUp()
+ {
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ Assert.assertEquals("JVM killed", shouldBeKilled, killerForTests.wasKilled());
+ }
+
@Test
public void test20() throws Exception
{
@@ -69,6 +92,7 @@ public class CommitLogUpgradeTest
}
@Test
+
public void test22() throws Exception
{
testRestore(DATA_DIR + "2.2");
@@ -86,6 +110,47 @@ public class CommitLogUpgradeTest
testRestore(DATA_DIR + "2.2-snappy");
}
+ public void test22_truncated() throws Exception
+ {
+ testRestore(DATA_DIR + "2.2-lz4-truncated");
+ }
+
+ @Test(expected = CommitLogReplayException.class)
+ public void test22_bitrot() throws Exception
+ {
+ shouldBeKilled = true;
+ testRestore(DATA_DIR + "2.2-lz4-bitrot");
+ }
+
+ @Test
+ public void test22_bitrot_ignored() throws Exception
+ {
+ try {
+ System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+ testRestore(DATA_DIR + "2.2-lz4-bitrot");
+ } finally {
+ System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+ }
+ }
+
+ @Test(expected = CommitLogReplayException.class)
+ public void test22_bitrot2() throws Exception
+ {
+ shouldBeKilled = true;
+ testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+ }
+
+ @Test
+ public void test22_bitrot2_ignored() throws Exception
+ {
+ try {
+ System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+ testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+ } finally {
+ System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+ }
+ }
+
@BeforeClass
static public void initialize() throws FileNotFoundException, IOException, InterruptedException
{