You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/07/29 18:58:36 UTC
[1/3] git commit: Revert "Fail to start if commit log replay
encounters an exception"
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 db9623904 -> a5bc52eee
refs/heads/trunk b1c45b7f3 -> 4d282ca5d
Revert "Fail to start if commit log replay encounters an exception"
This reverts commit 581ce631026b98ee9438d54ef144df89bc91100b.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5bc52ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5bc52ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5bc52ee
Branch: refs/heads/cassandra-2.1
Commit: a5bc52eee90e342efcdc53282612008d3dbaeaeb
Parents: db96239
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 29 11:57:34 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 29 11:57:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../cassandra/db/commitlog/CommitLog.java | 17 +--
.../db/commitlog/CommitLogDescriptor.java | 8 +-
.../db/commitlog/CommitLogReplayer.java | 76 +++--------
.../commitlog/MalformedCommitLogException.java | 16 ---
.../cassandra/service/CassandraDaemon.java | 2 -
.../org/apache/cassandra/db/CommitLogTest.java | 133 ++++---------------
7 files changed, 48 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64f9793..1a2dc57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,6 @@
2.1.1
* Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
* Add listen_interface and rpc_interface options (CASSANDRA-7417)
- * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
* Improve schema merge performance (CASSANDRA-7444)
* Adjust MT depth based on # of partition validating (CASSANDRA-5263)
* Optimise NativeCell comparisons (CASSANDRA-6755)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a1be25d..d2a5fa7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -131,20 +131,9 @@ public class CommitLog implements CommitLogMBean
*/
public int recover(File... clogs) throws IOException
{
- try
- {
- CommitLogReplayer recovery = new CommitLogReplayer();
- recovery.recover(clogs);
- return recovery.blockForWrites();
- }
- catch (IOException e)
- {
- if (e instanceof UnknownColumnFamilyException)
- logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line");
- if (e instanceof MalformedCommitLogException)
- logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line");
- throw e;
- }
+ CommitLogReplayer recovery = new CommitLogReplayer();
+ recovery.recover(clogs);
+ return recovery.blockForWrites();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 77c25d3..91c81e1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,8 +28,6 @@ import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
@@ -50,11 +48,10 @@ public class CommitLogDescriptor
* Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
* Note: make sure to handle {@link #getMessagingVersion()}
*/
- @VisibleForTesting
public static final int current_version = VERSION_21;
// [version, id, checksum]
- public static final int HEADER_SIZE = 4 + 8 + 4;
+ static final int HEADER_SIZE = 4 + 8 + 4;
final int version;
public final long id;
@@ -70,8 +67,7 @@ public class CommitLogDescriptor
this(current_version, id);
}
- @VisibleForTesting
- public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+ static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
{
out.putInt(0, descriptor.version);
out.putLong(4, descriptor.id);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 10d13b2..1012829 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
@@ -49,8 +48,6 @@ public class CommitLogReplayer
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
- private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false");
- private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false");
private final Set<Keyspace> keyspacesRecovered;
private final List<Future<?>> futures;
@@ -63,16 +60,16 @@ public class CommitLogReplayer
public CommitLogReplayer()
{
- this.keyspacesRecovered = new NonBlockingHashSet<>();
- this.futures = new ArrayList<>();
+ this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
+ this.futures = new ArrayList<Future<?>>();
this.buffer = new byte[4096];
- this.invalidMutations = new HashMap<>();
+ this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
this.checksum = new PureJavaCrc32();
// compute per-CF and global replay positions
- cfPositions = new HashMap<>();
+ cfPositions = new HashMap<UUID, ReplayPosition>();
Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
@@ -120,12 +117,7 @@ public class CommitLogReplayer
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
if (offset != reader.length() && offset != Integer.MAX_VALUE)
- {
- String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
- }
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
// cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
return -1;
}
@@ -144,19 +136,13 @@ public class CommitLogReplayer
{
if (end != 0 || filecrc != 0)
{
- String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
+ logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
}
return -1;
}
else if (end < offset || end > reader.length())
{
- String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
+ logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
return -1;
}
return end;
@@ -285,9 +271,8 @@ public class CommitLogReplayer
/* read the logs populate Mutation and apply */
while (reader.getPosition() < end && !reader.isEOF())
{
- long mutationStart = reader.getFilePointer();
if (logger.isDebugEnabled())
- logger.debug("Reading mutation at {}", mutationStart);
+ logger.debug("Reading mutation at {}", reader.getFilePointer());
long claimedCRC32;
int serializedSize;
@@ -297,7 +282,7 @@ public class CommitLogReplayer
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
- logger.debug("Encountered end of segment marker at {}", mutationStart);
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
break main;
}
@@ -306,11 +291,7 @@ public class CommitLogReplayer
// 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
- {
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart);
break main;
- }
long claimedSizeChecksum;
if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -324,11 +305,7 @@ public class CommitLogReplayer
checksum.updateInt(serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
- {
- if (!IGNORE_ERRORS)
- throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file);
break main; // entry wasn't synced correctly/fully. that's
- }
// ok.
if (serializedSize > buffer.length)
@@ -341,17 +318,12 @@ public class CommitLogReplayer
}
catch (EOFException eof)
{
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof);
-
break main; // last CL entry didn't get completely written. that's ok.
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
- if (!IGNORE_ERRORS)
- throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file);
// this entry must not have been fsynced. probably the rest is bad too,
// but just in case there is no harm in trying them (since we still read on an entry boundary)
continue;
@@ -372,9 +344,6 @@ public class CommitLogReplayer
}
catch (UnknownColumnFamilyException ex)
{
- if (!IGNORE_MISSING_TABLES)
- throw ex;
-
if (ex.cfId == null)
continue;
AtomicInteger i = invalidMutations.get(ex.cfId);
@@ -389,14 +358,16 @@ public class CommitLogReplayer
}
catch (Throwable t)
{
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Encountered bad mutation", t);
-
File f = File.createTempFile("mutation", "dat");
- try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
{
out.write(buffer, 0, serializedSize);
}
+ finally
+ {
+ out.close();
+ }
String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
f.getAbsolutePath());
logger.error(st, t);
@@ -412,11 +383,7 @@ public class CommitLogReplayer
public void runMayThrow() throws IOException
{
if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- {
- if (!IGNORE_MISSING_TABLES)
- throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next());
return;
- }
if (pointInTimeExceeded(mutation))
return;
@@ -431,12 +398,7 @@ public class CommitLogReplayer
for (ColumnFamily columnFamily : replayFilter.filter(mutation))
{
if (Schema.instance.getCF(columnFamily.id()) == null)
- {
- if (!IGNORE_MISSING_TABLES)
- throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(),
- mutation.getColumnFamilyIds().iterator().next());
continue; // dropped
- }
ReplayPosition rp = cfPositions.get(columnFamily.id());
@@ -453,7 +415,7 @@ public class CommitLogReplayer
if (newMutation != null)
{
assert !newMutation.isEmpty();
- keyspace.apply(newMutation, false);
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
keyspacesRecovered.add(keyspace);
}
}
@@ -491,10 +453,4 @@ public class CommitLogReplayer
}
return false;
}
-
- @VisibleForTesting
- public static void setIgnoreErrors(boolean ignore)
- {
- IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
deleted file mode 100644
index 84a5cb0..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-
-// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true)
-public class MalformedCommitLogException extends IOException
-{
- public MalformedCommitLogException(String message)
- {
- super(message);
- }
- public MalformedCommitLogException(String message, Throwable cause)
- {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 07c6cc4..fbee7ce 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -47,8 +47,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.exceptions.ConfigurationException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index dd05272..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -36,53 +36,46 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogReplayer;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class CommitLogTest extends SchemaLoader
{
-
- static
- {
- System.setProperty("cassandra.commitlog.stop_on_errors", "true");
- }
-
@Test
public void testRecoveryWithEmptyLog() throws Exception
{
- testMalformed(badLogFile(new byte[0]));
+ CommitLog.instance.recover(new File[]{ tmpFile() });
}
@Test
public void testRecoveryWithShortLog() throws Exception
{
// force EOF while reading log
- testMalformed(badLogFile(100, 10));
+ testRecoveryWithBadSizeArgument(100, 10);
}
@Test
public void testRecoveryWithShortSize() throws Exception
{
- testMalformed(new byte[2]);
+ testRecovery(new byte[2]);
}
@Test
public void testRecoveryWithShortCheckSum() throws Exception
{
- testMalformed(new byte[6]);
+ testRecovery(new byte[6]);
}
@Test
public void testRecoveryWithGarbageLog() throws Exception
{
- testMalformed(garbage(100));
+ byte[] garbage = new byte[100];
+ (new java.util.Random()).nextBytes(garbage);
+ testRecovery(garbage);
}
@Test
@@ -90,30 +83,21 @@ public class CommitLogTest extends SchemaLoader
{
Checksum checksum = new CRC32();
checksum.update(100);
- testMalformed(badLogFile(100, checksum.getValue(), new byte[100]));
- testMalformed(badLogFile(100, checksum.getValue(), garbage(100)));
- }
-
- @Test
- public void testRecoveryWithBadSize() throws Exception
- {
- Checksum checksum = new CRC32();
- checksum.update(100);
- testMalformed(badLogFile(120, checksum.getValue(), garbage(100)));
+ testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
}
@Test
public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
{
// many different combinations of 4 bytes (garbage) will be read as zero by readInt()
- testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF
+ testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
}
@Test
public void testRecoveryWithNegativeSizeArgument() throws Exception
{
// garbage from a partial/bad flush could be read as a negative size even if there is no EOF
- testMalformed(badLogFile(-10, 10)); // zero size, but no EOF
+ testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
}
@Test
@@ -190,8 +174,8 @@ public class CommitLogTest extends SchemaLoader
private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
{
- Mutation rm = new Mutation(keyspace, key);
- rm.add(table, column, ByteBuffer.allocate(0), 0);
+ Mutation rm = new Mutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
@@ -231,73 +215,22 @@ public class CommitLogTest extends SchemaLoader
}
}
- // construct log file with correct chunk checksum for the provided size/position
- protected File badLogFile(int markerSize, int realSize) throws Exception
- {
- return badLogFile(markerSize, garbage(realSize));
- }
-
- protected File badLogFile(int markerSize, byte[] data) throws Exception
- {
- File logFile = tmpFile();
- CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName());
- PureJavaCrc32 crc = new PureJavaCrc32();
- crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
- crc.updateInt((int) (descriptor.id >>> 32));
- crc.updateInt(CommitLogDescriptor.HEADER_SIZE);
- return badLogFile(markerSize, crc.getCrc(), data, logFile);
- }
-
- protected byte[] garbage(int size)
- {
- byte[] garbage = new byte[size];
- (new java.util.Random()).nextBytes(garbage);
- return garbage;
- }
-
- protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception
- {
- return badLogFile(markerSize, checksum, realSize, tmpFile());
- }
-
- protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
{
- return badLogFile(markerSize, checksum, new byte[realSize], logFile);
- }
-
- protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception
- {
- return badLogFile(markerSize, checksum, chunk, tmpFile());
+ Checksum checksum = new CRC32();
+ checksum.update(size);
+ testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
}
- protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(out);
- ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE);
- CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName()));
- out.write(buffer.array());
- dout.writeInt(markerSize);
+ dout.writeInt(size);
dout.writeLong(checksum);
- dout.write(chunk);
+ dout.write(new byte[dataSize]);
dout.close();
- try (OutputStream lout = new FileOutputStream(logFile))
- {
- lout.write(out.toByteArray());
- lout.close();
- }
- return logFile;
- }
-
- protected File badLogFile(byte[] contents) throws Exception
- {
- File logFile = tmpFile();
- try (OutputStream lout = new FileOutputStream(logFile))
- {
- lout.write(contents);
- lout.close();
- }
- return logFile;
+ testRecovery(out.toByteArray());
}
protected File tmpFile() throws IOException
@@ -308,29 +241,17 @@ public class CommitLogTest extends SchemaLoader
return logFile;
}
- private void testMalformed(byte[] contents) throws Exception
- {
- testMalformed(badLogFile(contents));
- testMalformed(badLogFile(contents.length, contents));
- }
-
- private void testMalformed(File logFile) throws Exception
+ protected void testRecovery(byte[] logData) throws Exception
{
- CommitLogReplayer.setIgnoreErrors(true);
- CommitLog.instance.recover(new File[]{ logFile });
- CommitLogReplayer.setIgnoreErrors(false);
- try
- {
- CommitLog.instance.recover(new File[]{ logFile });
- Assert.assertFalse(true);
- }
- catch (Throwable t)
+ File logFile = tmpFile();
+ try (OutputStream lout = new FileOutputStream(logFile))
{
- if (!(t instanceof MalformedCommitLogException))
- throw t;
+ lout.write(logData);
+ //statics make it annoying to test things correctly
+ CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
}
}
-
+
@Test
public void testVersions()
{
[2/3] git commit: Revert "Fail to start if commit log replay
encounters an exception"
Posted by jb...@apache.org.
Revert "Fail to start if commit log replay encounters an exception"
This reverts commit 581ce631026b98ee9438d54ef144df89bc91100b.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5bc52ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5bc52ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5bc52ee
Branch: refs/heads/trunk
Commit: a5bc52eee90e342efcdc53282612008d3dbaeaeb
Parents: db96239
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 29 11:57:34 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 29 11:57:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../cassandra/db/commitlog/CommitLog.java | 17 +--
.../db/commitlog/CommitLogDescriptor.java | 8 +-
.../db/commitlog/CommitLogReplayer.java | 76 +++--------
.../commitlog/MalformedCommitLogException.java | 16 ---
.../cassandra/service/CassandraDaemon.java | 2 -
.../org/apache/cassandra/db/CommitLogTest.java | 133 ++++---------------
7 files changed, 48 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64f9793..1a2dc57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,6 @@
2.1.1
* Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
* Add listen_interface and rpc_interface options (CASSANDRA-7417)
- * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
* Improve schema merge performance (CASSANDRA-7444)
* Adjust MT depth based on # of partition validating (CASSANDRA-5263)
* Optimise NativeCell comparisons (CASSANDRA-6755)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a1be25d..d2a5fa7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -131,20 +131,9 @@ public class CommitLog implements CommitLogMBean
*/
public int recover(File... clogs) throws IOException
{
- try
- {
- CommitLogReplayer recovery = new CommitLogReplayer();
- recovery.recover(clogs);
- return recovery.blockForWrites();
- }
- catch (IOException e)
- {
- if (e instanceof UnknownColumnFamilyException)
- logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line");
- if (e instanceof MalformedCommitLogException)
- logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line");
- throw e;
- }
+ CommitLogReplayer recovery = new CommitLogReplayer();
+ recovery.recover(clogs);
+ return recovery.blockForWrites();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 77c25d3..91c81e1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,8 +28,6 @@ import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
@@ -50,11 +48,10 @@ public class CommitLogDescriptor
* Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
* Note: make sure to handle {@link #getMessagingVersion()}
*/
- @VisibleForTesting
public static final int current_version = VERSION_21;
// [version, id, checksum]
- public static final int HEADER_SIZE = 4 + 8 + 4;
+ static final int HEADER_SIZE = 4 + 8 + 4;
final int version;
public final long id;
@@ -70,8 +67,7 @@ public class CommitLogDescriptor
this(current_version, id);
}
- @VisibleForTesting
- public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+ static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
{
out.putInt(0, descriptor.version);
out.putLong(4, descriptor.id);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 10d13b2..1012829 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
@@ -49,8 +48,6 @@ public class CommitLogReplayer
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
- private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false");
- private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false");
private final Set<Keyspace> keyspacesRecovered;
private final List<Future<?>> futures;
@@ -63,16 +60,16 @@ public class CommitLogReplayer
public CommitLogReplayer()
{
- this.keyspacesRecovered = new NonBlockingHashSet<>();
- this.futures = new ArrayList<>();
+ this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
+ this.futures = new ArrayList<Future<?>>();
this.buffer = new byte[4096];
- this.invalidMutations = new HashMap<>();
+ this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
this.checksum = new PureJavaCrc32();
// compute per-CF and global replay positions
- cfPositions = new HashMap<>();
+ cfPositions = new HashMap<UUID, ReplayPosition>();
Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
@@ -120,12 +117,7 @@ public class CommitLogReplayer
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
if (offset != reader.length() && offset != Integer.MAX_VALUE)
- {
- String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
- }
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
// cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
return -1;
}
@@ -144,19 +136,13 @@ public class CommitLogReplayer
{
if (end != 0 || filecrc != 0)
{
- String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
+ logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
}
return -1;
}
else if (end < offset || end > reader.length())
{
- String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath());
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException(message);
- logger.warn(message);
+ logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
return -1;
}
return end;
@@ -285,9 +271,8 @@ public class CommitLogReplayer
/* read the logs populate Mutation and apply */
while (reader.getPosition() < end && !reader.isEOF())
{
- long mutationStart = reader.getFilePointer();
if (logger.isDebugEnabled())
- logger.debug("Reading mutation at {}", mutationStart);
+ logger.debug("Reading mutation at {}", reader.getFilePointer());
long claimedCRC32;
int serializedSize;
@@ -297,7 +282,7 @@ public class CommitLogReplayer
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
- logger.debug("Encountered end of segment marker at {}", mutationStart);
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
break main;
}
@@ -306,11 +291,7 @@ public class CommitLogReplayer
// 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
- {
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart);
break main;
- }
long claimedSizeChecksum;
if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -324,11 +305,7 @@ public class CommitLogReplayer
checksum.updateInt(serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
- {
- if (!IGNORE_ERRORS)
- throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file);
break main; // entry wasn't synced correctly/fully. that's
- }
// ok.
if (serializedSize > buffer.length)
@@ -341,17 +318,12 @@ public class CommitLogReplayer
}
catch (EOFException eof)
{
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof);
-
break main; // last CL entry didn't get completely written. that's ok.
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
- if (!IGNORE_ERRORS)
- throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file);
// this entry must not have been fsynced. probably the rest is bad too,
// but just in case there is no harm in trying them (since we still read on an entry boundary)
continue;
@@ -372,9 +344,6 @@ public class CommitLogReplayer
}
catch (UnknownColumnFamilyException ex)
{
- if (!IGNORE_MISSING_TABLES)
- throw ex;
-
if (ex.cfId == null)
continue;
AtomicInteger i = invalidMutations.get(ex.cfId);
@@ -389,14 +358,16 @@ public class CommitLogReplayer
}
catch (Throwable t)
{
- if (!IGNORE_ERRORS)
- throw new MalformedCommitLogException("Encountered bad mutation", t);
-
File f = File.createTempFile("mutation", "dat");
- try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
{
out.write(buffer, 0, serializedSize);
}
+ finally
+ {
+ out.close();
+ }
String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
f.getAbsolutePath());
logger.error(st, t);
@@ -412,11 +383,7 @@ public class CommitLogReplayer
public void runMayThrow() throws IOException
{
if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- {
- if (!IGNORE_MISSING_TABLES)
- throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next());
return;
- }
if (pointInTimeExceeded(mutation))
return;
@@ -431,12 +398,7 @@ public class CommitLogReplayer
for (ColumnFamily columnFamily : replayFilter.filter(mutation))
{
if (Schema.instance.getCF(columnFamily.id()) == null)
- {
- if (!IGNORE_MISSING_TABLES)
- throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(),
- mutation.getColumnFamilyIds().iterator().next());
continue; // dropped
- }
ReplayPosition rp = cfPositions.get(columnFamily.id());
@@ -453,7 +415,7 @@ public class CommitLogReplayer
if (newMutation != null)
{
assert !newMutation.isEmpty();
- keyspace.apply(newMutation, false);
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
keyspacesRecovered.add(keyspace);
}
}
@@ -491,10 +453,4 @@ public class CommitLogReplayer
}
return false;
}
-
- @VisibleForTesting
- public static void setIgnoreErrors(boolean ignore)
- {
- IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
deleted file mode 100644
index 84a5cb0..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-
-// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true)
-public class MalformedCommitLogException extends IOException
-{
- public MalformedCommitLogException(String message)
- {
- super(message);
- }
- public MalformedCommitLogException(String message, Throwable cause)
- {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 07c6cc4..fbee7ce 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -47,8 +47,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.exceptions.ConfigurationException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index dd05272..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -36,53 +36,46 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogReplayer;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class CommitLogTest extends SchemaLoader
{
-
- static
- {
- System.setProperty("cassandra.commitlog.stop_on_errors", "true");
- }
-
@Test
public void testRecoveryWithEmptyLog() throws Exception
{
- testMalformed(badLogFile(new byte[0]));
+ CommitLog.instance.recover(new File[]{ tmpFile() });
}
@Test
public void testRecoveryWithShortLog() throws Exception
{
// force EOF while reading log
- testMalformed(badLogFile(100, 10));
+ testRecoveryWithBadSizeArgument(100, 10);
}
@Test
public void testRecoveryWithShortSize() throws Exception
{
- testMalformed(new byte[2]);
+ testRecovery(new byte[2]);
}
@Test
public void testRecoveryWithShortCheckSum() throws Exception
{
- testMalformed(new byte[6]);
+ testRecovery(new byte[6]);
}
@Test
public void testRecoveryWithGarbageLog() throws Exception
{
- testMalformed(garbage(100));
+ byte[] garbage = new byte[100];
+ (new java.util.Random()).nextBytes(garbage);
+ testRecovery(garbage);
}
@Test
@@ -90,30 +83,21 @@ public class CommitLogTest extends SchemaLoader
{
Checksum checksum = new CRC32();
checksum.update(100);
- testMalformed(badLogFile(100, checksum.getValue(), new byte[100]));
- testMalformed(badLogFile(100, checksum.getValue(), garbage(100)));
- }
-
- @Test
- public void testRecoveryWithBadSize() throws Exception
- {
- Checksum checksum = new CRC32();
- checksum.update(100);
- testMalformed(badLogFile(120, checksum.getValue(), garbage(100)));
+ testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
}
@Test
public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
{
// many different combinations of 4 bytes (garbage) will be read as zero by readInt()
- testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF
+ testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
}
@Test
public void testRecoveryWithNegativeSizeArgument() throws Exception
{
// garbage from a partial/bad flush could be read as a negative size even if there is no EOF
- testMalformed(badLogFile(-10, 10)); // zero size, but no EOF
+ testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
}
@Test
@@ -190,8 +174,8 @@ public class CommitLogTest extends SchemaLoader
private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
{
- Mutation rm = new Mutation(keyspace, key);
- rm.add(table, column, ByteBuffer.allocate(0), 0);
+ Mutation rm = new Mutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
@@ -231,73 +215,22 @@ public class CommitLogTest extends SchemaLoader
}
}
- // construct log file with correct chunk checksum for the provided size/position
- protected File badLogFile(int markerSize, int realSize) throws Exception
- {
- return badLogFile(markerSize, garbage(realSize));
- }
-
- protected File badLogFile(int markerSize, byte[] data) throws Exception
- {
- File logFile = tmpFile();
- CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName());
- PureJavaCrc32 crc = new PureJavaCrc32();
- crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
- crc.updateInt((int) (descriptor.id >>> 32));
- crc.updateInt(CommitLogDescriptor.HEADER_SIZE);
- return badLogFile(markerSize, crc.getCrc(), data, logFile);
- }
-
- protected byte[] garbage(int size)
- {
- byte[] garbage = new byte[size];
- (new java.util.Random()).nextBytes(garbage);
- return garbage;
- }
-
- protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception
- {
- return badLogFile(markerSize, checksum, realSize, tmpFile());
- }
-
- protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
{
- return badLogFile(markerSize, checksum, new byte[realSize], logFile);
- }
-
- protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception
- {
- return badLogFile(markerSize, checksum, chunk, tmpFile());
+ Checksum checksum = new CRC32();
+ checksum.update(size);
+ testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
}
- protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(out);
- ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE);
- CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName()));
- out.write(buffer.array());
- dout.writeInt(markerSize);
+ dout.writeInt(size);
dout.writeLong(checksum);
- dout.write(chunk);
+ dout.write(new byte[dataSize]);
dout.close();
- try (OutputStream lout = new FileOutputStream(logFile))
- {
- lout.write(out.toByteArray());
- lout.close();
- }
- return logFile;
- }
-
- protected File badLogFile(byte[] contents) throws Exception
- {
- File logFile = tmpFile();
- try (OutputStream lout = new FileOutputStream(logFile))
- {
- lout.write(contents);
- lout.close();
- }
- return logFile;
+ testRecovery(out.toByteArray());
}
protected File tmpFile() throws IOException
@@ -308,29 +241,17 @@ public class CommitLogTest extends SchemaLoader
return logFile;
}
- private void testMalformed(byte[] contents) throws Exception
- {
- testMalformed(badLogFile(contents));
- testMalformed(badLogFile(contents.length, contents));
- }
-
- private void testMalformed(File logFile) throws Exception
+ protected void testRecovery(byte[] logData) throws Exception
{
- CommitLogReplayer.setIgnoreErrors(true);
- CommitLog.instance.recover(new File[]{ logFile });
- CommitLogReplayer.setIgnoreErrors(false);
- try
- {
- CommitLog.instance.recover(new File[]{ logFile });
- Assert.assertFalse(true);
- }
- catch (Throwable t)
+ File logFile = tmpFile();
+ try (OutputStream lout = new FileOutputStream(logFile))
{
- if (!(t instanceof MalformedCommitLogException))
- throw t;
+ lout.write(logData);
+ //statics make it annoying to test things correctly
+ CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
}
}
-
+
@Test
public void testVersions()
{
[3/3] git commit: merge from 2.1
Posted by jb...@apache.org.
merge from 2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4d282ca5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4d282ca5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4d282ca5
Branch: refs/heads/trunk
Commit: 4d282ca5d92ba9ae04e00491cd433bde1af67dbc
Parents: b1c45b7 a5bc52e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 29 11:58:29 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 29 11:58:29 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../cassandra/db/commitlog/CommitLog.java | 17 +--
.../db/commitlog/CommitLogDescriptor.java | 8 +-
.../db/commitlog/CommitLogReplayer.java | 76 +++--------
.../commitlog/MalformedCommitLogException.java | 16 ---
.../cassandra/service/CassandraDaemon.java | 2 -
.../org/apache/cassandra/db/CommitLogTest.java | 127 ++++---------------
7 files changed, 48 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d282ca5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 47c5ce1,1a2dc57..119e005
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,6 +1,23 @@@
+3.0
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+
+
2.1.1
* Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
+ * Add duration mode to cassandra-stress (CASSANDRA-7468)
* Add listen_interface and rpc_interface options (CASSANDRA-7417)
- * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
* Improve schema merge performance (CASSANDRA-7444)
* Adjust MT depth based on # of partition validating (CASSANDRA-5263)
* Optimise NativeCell comparisons (CASSANDRA-6755)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d282ca5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d282ca5/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 3f1b7b5,91c81e1..e50a585
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@@ -28,10 -28,9 +28,8 @@@ import java.nio.ByteBuffer
import java.util.regex.Matcher;
import java.util.regex.Pattern;
- import com.google.common.annotations.VisibleForTesting;
-
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.PureJavaCrc32;
public class CommitLogDescriptor
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d282ca5/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java
index 762d2d0,7046536..f8cb8c8
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@@ -35,18 -34,12 +35,15 @@@ import org.apache.cassandra.SchemaLoade
import org.apache.cassandra.Util;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.CommitLogReplayer;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
- import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;