You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/05/08 23:00:34 UTC
[1/5] git commit: inline
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 a89fd4d1d -> 0b26c778b
refs/heads/trunk 94c029924 -> 10aef4d87
inline
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26c778
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26c778
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26c778
Branch: refs/heads/cassandra-2.1
Commit: 0b26c778bcdbef83cb5e0480a8c7b38d58d2aec6
Parents: 134e022
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 16:00:01 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:09 2014 -0500
----------------------------------------------------------------------
.../cassandra/db/commitlog/CommitLogArchiver.java | 6 +++---
.../db/commitlog/CommitLogDescriptor.java | 7 +------
.../cassandra/db/commitlog/CommitLogReplayer.java | 17 ++++++++---------
3 files changed, 12 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index d715fcc..2795cae 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -168,14 +168,14 @@ public class CommitLogArchiver
throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
- else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21)
+ else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
else if (fromHeader != null)
descriptor = fromHeader;
else descriptor = fromName;
- if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21)
- throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion());
+ if (descriptor.version > CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);
File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
if (toFile.exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index b11da94..91c81e1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -53,7 +53,7 @@ public class CommitLogDescriptor
// [version, id, checksum]
static final int HEADER_SIZE = 4 + 8 + 4;
- private final int version;
+ final int version;
public final long id;
public CommitLogDescriptor(int version, long id)
@@ -132,11 +132,6 @@ public class CommitLogDescriptor
}
}
- public int getVersion()
- {
- return version;
- }
-
public String fileName()
{
return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 59ae4e4..1012829 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -22,7 +22,6 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
@@ -129,7 +128,7 @@ public class CommitLogReplayer
crc.updateInt((int) reader.getPosition());
int end = reader.readInt();
long filecrc;
- if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (descriptor.version < CommitLogDescriptor.VERSION_21)
filecrc = reader.readLong();
else
filecrc = reader.readInt() & 0xffffffffL;
@@ -234,14 +233,14 @@ public class CommitLogReplayer
final long segmentId = desc.id;
logger.info("Replaying {} (CL version {}, messaging version {})",
file.getPath(),
- desc.getVersion(),
+ desc.version,
desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
try
{
assert reader.length() <= Integer.MAX_VALUE;
- int offset = getStartOffset(segmentId, desc.getVersion());
+ int offset = getStartOffset(segmentId, desc.version);
if (offset < 0)
{
logger.debug("skipping replay of fully-flushed {}", file);
@@ -253,7 +252,7 @@ public class CommitLogReplayer
{
int end = prevEnd;
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
end = Integer.MAX_VALUE;
else
{
@@ -295,12 +294,12 @@ public class CommitLogReplayer
break main;
long claimedSizeChecksum;
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
claimedSizeChecksum = reader.readLong();
else
claimedSizeChecksum = reader.readInt() & 0xffffffffL;
checksum.reset();
- if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
+ if (desc.version < CommitLogDescriptor.VERSION_20)
checksum.update(serializedSize);
else
checksum.updateInt(serializedSize);
@@ -312,7 +311,7 @@ public class CommitLogReplayer
if (serializedSize > buffer.length)
buffer = new byte[(int) (1.2 * serializedSize)];
reader.readFully(buffer, 0, serializedSize);
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
claimedCRC32 = reader.readLong();
else
claimedCRC32 = reader.readInt() & 0xffffffffL;
@@ -429,7 +428,7 @@ public class CommitLogReplayer
}
}
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
break;
offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
[4/5] git commit: Make commitlog archive+restore more robust patch by
bes; reviewed by jbellis for CASSANDRA-6974
Posted by jb...@apache.org.
Make commitlog archive+restore more robust
patch by bes; reviewed by jbellis for CASSANDRA-6974
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/134e0226
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/134e0226
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/134e0226
Branch: refs/heads/trunk
Commit: 134e0226e42d977e8e73477b1ff24d51e64b4436
Parents: a89fd4d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 15:56:22 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:09 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogArchiver.java | 30 +++++++--
.../db/commitlog/CommitLogDescriptor.java | 64 ++++++++++++++++++++
.../db/commitlog/CommitLogReplayer.java | 37 +++++++----
.../db/commitlog/CommitLogSegment.java | 57 +++++++++++++----
.../db/commitlog/CommitLogSegmentManager.java | 7 +--
.../apache/cassandra/utils/PureJavaCrc32.java | 17 +++++-
8 files changed, 178 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d80937..1ebc050 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc1
+ * Make commitlog archive+restore more robust (CASSANDRA-6974)
* Fix marking commitlogsegments clean (CASSANDRA-6959)
* Add snapshot "manifest" describing files included (CASSANDRA-6326)
* Parallel streaming for sstableloader (CASSANDRA-3668)
@@ -24,6 +25,7 @@ Merged from 1.2:
* remove duplicate query for local tokens (CASSANDRA-7182)
* exit CQLSH with error status code if script fails (CASSANDRA-6344)
+
2.1.0-beta2
* Increase default CL space to 8GB (CASSANDRA-7031)
* Add range tombstones to read repair digests (CASSANDRA-6863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a230e35..eaa1b3c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -216,13 +216,13 @@ public class CommitLog implements CommitLogMBean
// checksummed length
dos.writeInt((int) size);
checksum.update(buffer, buffer.position() - 4, 4);
- buffer.putLong(checksum.getValue());
+ buffer.putInt(checksum.getCrc());
int start = buffer.position();
// checksummed mutation
Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
checksum.update(buffer, start, (int) size);
- buffer.putLong(checksum.getValue());
+ buffer.putInt(checksum.getCrc());
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 6161435..d715fcc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -103,17 +103,18 @@ public class CommitLogArchiver
}
}
- public void maybeArchive(final String path, final String name)
+ public void maybeArchive(final CommitLogSegment segment)
{
if (Strings.isNullOrEmpty(archiveCommand))
return;
- archivePending.put(name, executor.submit(new WrappedRunnable()
+ archivePending.put(segment.getName(), executor.submit(new WrappedRunnable()
{
protected void runMayThrow() throws IOException
{
- String command = archiveCommand.replace("%name", name);
- command = command.replace("%path", path);
+ segment.waitForFinalSync();
+ String command = archiveCommand.replace("%name", segment.getName());
+ command = command.replace("%path", segment.getPath());
exec(command);
}
}));
@@ -160,7 +161,26 @@ public class CommitLogArchiver
}
for (File fromFile : files)
{
- File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(CommitLogSegment.getNextId()).fileName());
+ CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+ CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
+ CommitLogDescriptor descriptor;
+ if (fromHeader == null && fromName == null)
+ throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
+ else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
+ throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
+ else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
+ else if (fromHeader != null)
+ descriptor = fromHeader;
+ else descriptor = fromName;
+
+ if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion());
+
+ File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
+ if (toFile.exists())
+ throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath());
+
String command = restoreCommand.replace("%from", fromFile.getPath());
command = command.replace("%to", toFile.getPath());
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 0c8ed61..b11da94 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -20,10 +20,18 @@
*/
package org.apache.cassandra.db.commitlog;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
public class CommitLogDescriptor
{
@@ -42,6 +50,9 @@ public class CommitLogDescriptor
*/
public static final int current_version = VERSION_21;
+ // [version, id, checksum]
+ static final int HEADER_SIZE = 4 + 8 + 4;
+
private final int version;
public final long id;
@@ -56,6 +67,43 @@ public class CommitLogDescriptor
this(current_version, id);
}
+ static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+ {
+ out.putInt(0, descriptor.version);
+ out.putLong(4, descriptor.id);
+ PureJavaCrc32 crc = new PureJavaCrc32();
+ crc.updateInt(descriptor.version);
+ crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+ crc.updateInt((int) (descriptor.id >>> 32));
+ out.putInt(12, crc.getCrc());
+ }
+
+ public static CommitLogDescriptor fromHeader(File file)
+ {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
+ {
+ assert raf.getFilePointer() == 0;
+ int version = raf.readInt();
+ long id = raf.readLong();
+ int crc = raf.readInt();
+ PureJavaCrc32 checkcrc = new PureJavaCrc32();
+ checkcrc.updateInt(version);
+ checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
+ checkcrc.updateInt((int) (id >>> 32));
+ if (crc == checkcrc.getCrc())
+ return new CommitLogDescriptor(version, id);
+ return null;
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, file);
+ }
+ }
+
public static CommitLogDescriptor fromFileName(String name)
{
Matcher matcher;
@@ -102,4 +150,20 @@ public class CommitLogDescriptor
{
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
+
+ public String toString()
+ {
+ return "(" + version + "," + id + ")";
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
+ }
+
+ public boolean equals(CommitLogDescriptor that)
+ {
+ return this.version == that.version && this.id == that.id;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index fb33187..59ae4e4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -56,7 +56,7 @@ public class CommitLogReplayer
private final AtomicInteger replayedCount;
private final Map<UUID, ReplayPosition> cfPositions;
private final ReplayPosition globalPosition;
- private final Checksum checksum;
+ private final PureJavaCrc32 checksum;
private byte[] buffer;
public CommitLogReplayer()
@@ -113,22 +113,26 @@ public class CommitLogReplayer
return replayedCount.get();
}
- private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
{
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
if (offset != reader.length() && offset != Integer.MAX_VALUE)
- logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
// cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
return -1;
}
reader.seek(offset);
PureJavaCrc32 crc = new PureJavaCrc32();
- crc.update((int) (segmentId & 0xFFFFFFFFL));
- crc.update((int) (segmentId >>> 32));
- crc.update((int) reader.getPosition());
+ crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+ crc.updateInt((int) (descriptor.id >>> 32));
+ crc.updateInt((int) reader.getPosition());
int end = reader.readInt();
- long filecrc = reader.readLong();
+ long filecrc;
+ if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21)
+ filecrc = reader.readLong();
+ else
+ filecrc = reader.readInt() & 0xffffffffL;
if (crc.getValue() != filecrc)
{
if (end != 0 || filecrc != 0)
@@ -150,7 +154,7 @@ public class CommitLogReplayer
if (globalPosition.segment < segmentId)
{
if (version >= CommitLogDescriptor.VERSION_21)
- return CommitLogSegment.SYNC_MARKER_SIZE;
+ return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE;
else
return 0;
}
@@ -244,7 +248,7 @@ public class CommitLogReplayer
return;
}
- int prevEnd = 0;
+ int prevEnd = CommitLogDescriptor.HEADER_SIZE;
main: while (true)
{
@@ -253,7 +257,7 @@ public class CommitLogReplayer
end = Integer.MAX_VALUE;
else
{
- do { end = readHeader(segmentId, end, reader); }
+ do { end = readSyncMarker(desc, end, reader); }
while (end < offset && end > prevEnd);
}
@@ -290,12 +294,16 @@ public class CommitLogReplayer
if (serializedSize < 10)
break main;
- long claimedSizeChecksum = reader.readLong();
+ long claimedSizeChecksum;
+ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ claimedSizeChecksum = reader.readLong();
+ else
+ claimedSizeChecksum = reader.readInt() & 0xffffffffL;
checksum.reset();
if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
checksum.update(serializedSize);
else
- FBUtilities.updateChecksumInt(checksum, serializedSize);
+ checksum.updateInt(serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
break main; // entry wasn't synced correctly/fully. that's
@@ -304,7 +312,10 @@ public class CommitLogReplayer
if (serializedSize > buffer.length)
buffer = new byte[(int) (1.2 * serializedSize)];
reader.readFully(buffer, 0, serializedSize);
- claimedCRC32 = reader.readLong();
+ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ claimedCRC32 = reader.readLong();
+ else
+ claimedCRC32 = reader.readInt() & 0xffffffffL;
}
catch (EOFException eof)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3830966..2120d3e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
@@ -59,14 +60,24 @@ public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
- private final static long idBase = System.currentTimeMillis();
+ private final static long idBase;
private final static AtomicInteger nextId = new AtomicInteger(1);
+ static
+ {
+ long maxId = Long.MIN_VALUE;
+ for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ {
+ if (CommitLogDescriptor.isValid(file.getName()))
+ maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId);
+ }
+ idBase = Math.max(System.currentTimeMillis(), maxId + 1);
+ }
- // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
- static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
+ // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
+ static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4;
- // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position])
- static final int SYNC_MARKER_SIZE = 4 + 8;
+ // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
+ static final int SYNC_MARKER_SIZE = 4 + 4;
// The OpOrder used to order appends wrt sync
private final OpOrder appendOrder = new OpOrder();
@@ -154,10 +165,13 @@ public class CommitLogSegment
fd = CLibrary.getfd(logFileAccessor.getFD());
buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
- // mark the initial header as uninitialised
- buffer.putInt(0, 0);
- buffer.putLong(4, 0);
- allocatePosition.set(SYNC_MARKER_SIZE);
+ // write the header
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ // mark the initial sync marker as uninitialised
+ buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0);
+ buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0);
+ allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE);
+ lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE;
}
catch (IOException e)
{
@@ -292,11 +306,11 @@ public class CommitLogSegment
// we don't chain the crcs here to ensure this method is idempotent if it fails
int offset = lastSyncedOffset;
final PureJavaCrc32 crc = new PureJavaCrc32();
- crc.update((int) (id & 0xFFFFFFFFL));
- crc.update((int) (id >>> 32));
- crc.update(offset);
+ crc.updateInt((int) (id & 0xFFFFFFFFL));
+ crc.updateInt((int) (id >>> 32));
+ crc.updateInt(offset);
buffer.putInt(offset, nextMarker);
- buffer.putLong(offset + 4, crc.getValue());
+ buffer.putInt(offset + 4, crc.getCrc());
// zero out the next sync marker so replayer can cleanly exit
if (nextMarker < buffer.capacity())
@@ -383,6 +397,23 @@ public class CommitLogSegment
return logFile.getName();
}
+ void waitForFinalSync()
+ {
+ while (true)
+ {
+ WaitQueue.Signal signal = syncComplete.register();
+ if (lastSyncedOffset < buffer.capacity())
+ {
+ signal.awaitUninterruptibly();
+ }
+ else
+ {
+ signal.cancel();
+ break;
+ }
+ }
+ }
+
/**
* Close the segment file.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index b0be42c..5802e8a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.*;
@@ -222,7 +220,7 @@ public class CommitLogSegmentManager
{
// Now we can run the user defined command just after switching to the new commit log.
// (Do this here instead of in the recycle call so we can get a head start on the archive.)
- CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
+ CommitLog.instance.archiver.maybeArchive(old);
// ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
old.discardUnusedTail();
@@ -314,8 +312,9 @@ public class CommitLogSegmentManager
*/
void recycleSegment(final CommitLogSegment segment)
{
+ boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName());
activeSegments.remove(segment);
- if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+ if (!archiveSuccess)
{
// if archiving (command) was not successful then leave the file alone. don't delete or recycle.
discardSegment(segment, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
index 041652f..9a1ac02 100644
--- a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
+++ b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
@@ -31,7 +31,7 @@ import java.util.zip.Checksum;
*
* @see java.util.zip.CRC32
*
- * This class is copied from hadoop-commons project.
+ * This class is copied from hadoop-commons project and retains that formatting.
* (The initial patch added PureJavaCrc32 was HADOOP-6148)
*/
public class PureJavaCrc32 implements Checksum {
@@ -49,7 +49,11 @@ public class PureJavaCrc32 implements Checksum {
return (~crc) & 0xffffffffL;
}
- @Override
+ public int getCrc() {
+ return ~crc;
+ }
+
+ @Override
public void reset() {
crc = 0xffffffff;
}
@@ -172,7 +176,14 @@ public class PureJavaCrc32 implements Checksum {
crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
}
- /*
+ final public void updateInt(int v) {
+ update((v >>> 24) & 0xFF);
+ update((v >>> 16) & 0xFF);
+ update((v >>> 8) & 0xFF);
+ update((v >>> 0) & 0xFF);
+ }
+
+ /*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320.
* See also TestPureJavaCrc32.Table.
*/
[5/5] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10aef4d8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10aef4d8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10aef4d8
Branch: refs/heads/trunk
Commit: 10aef4d87ad44cd7a124b941aa4997d3372be1f3
Parents: 94c0299 0b26c77
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 16:00:28 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:28 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogArchiver.java | 30 +++++++--
.../db/commitlog/CommitLogDescriptor.java | 71 ++++++++++++++++++--
.../db/commitlog/CommitLogReplayer.java | 48 +++++++------
.../db/commitlog/CommitLogSegment.java | 57 ++++++++++++----
.../db/commitlog/CommitLogSegmentManager.java | 7 +-
.../apache/cassandra/utils/PureJavaCrc32.java | 17 ++++-
8 files changed, 184 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/10aef4d8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7b387ac,1ebc050..7802d7b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
+3.0
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+
2.1.0-rc1
+ * Make commitlog archive+restore more robust (CASSANDRA-6974)
* Fix marking commitlogsegments clean (CASSANDRA-6959)
* Add snapshot "manifest" describing files included (CASSANDRA-6326)
* Parallel streaming for sstableloader (CASSANDRA-3668)
[3/5] git commit: inline
Posted by jb...@apache.org.
inline
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b26c778
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b26c778
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b26c778
Branch: refs/heads/trunk
Commit: 0b26c778bcdbef83cb5e0480a8c7b38d58d2aec6
Parents: 134e022
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 16:00:01 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:09 2014 -0500
----------------------------------------------------------------------
.../cassandra/db/commitlog/CommitLogArchiver.java | 6 +++---
.../db/commitlog/CommitLogDescriptor.java | 7 +------
.../cassandra/db/commitlog/CommitLogReplayer.java | 17 ++++++++---------
3 files changed, 12 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index d715fcc..2795cae 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -168,14 +168,14 @@ public class CommitLogArchiver
throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
- else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21)
+ else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
else if (fromHeader != null)
descriptor = fromHeader;
else descriptor = fromName;
- if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21)
- throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion());
+ if (descriptor.version > CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);
File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
if (toFile.exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index b11da94..91c81e1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -53,7 +53,7 @@ public class CommitLogDescriptor
// [version, id, checksum]
static final int HEADER_SIZE = 4 + 8 + 4;
- private final int version;
+ final int version;
public final long id;
public CommitLogDescriptor(int version, long id)
@@ -132,11 +132,6 @@ public class CommitLogDescriptor
}
}
- public int getVersion()
- {
- return version;
- }
-
public String fileName()
{
return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b26c778/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 59ae4e4..1012829 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -22,7 +22,6 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
@@ -129,7 +128,7 @@ public class CommitLogReplayer
crc.updateInt((int) reader.getPosition());
int end = reader.readInt();
long filecrc;
- if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (descriptor.version < CommitLogDescriptor.VERSION_21)
filecrc = reader.readLong();
else
filecrc = reader.readInt() & 0xffffffffL;
@@ -234,14 +233,14 @@ public class CommitLogReplayer
final long segmentId = desc.id;
logger.info("Replaying {} (CL version {}, messaging version {})",
file.getPath(),
- desc.getVersion(),
+ desc.version,
desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
try
{
assert reader.length() <= Integer.MAX_VALUE;
- int offset = getStartOffset(segmentId, desc.getVersion());
+ int offset = getStartOffset(segmentId, desc.version);
if (offset < 0)
{
logger.debug("skipping replay of fully-flushed {}", file);
@@ -253,7 +252,7 @@ public class CommitLogReplayer
{
int end = prevEnd;
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
end = Integer.MAX_VALUE;
else
{
@@ -295,12 +294,12 @@ public class CommitLogReplayer
break main;
long claimedSizeChecksum;
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
claimedSizeChecksum = reader.readLong();
else
claimedSizeChecksum = reader.readInt() & 0xffffffffL;
checksum.reset();
- if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
+ if (desc.version < CommitLogDescriptor.VERSION_20)
checksum.update(serializedSize);
else
checksum.updateInt(serializedSize);
@@ -312,7 +311,7 @@ public class CommitLogReplayer
if (serializedSize > buffer.length)
buffer = new byte[(int) (1.2 * serializedSize)];
reader.readFully(buffer, 0, serializedSize);
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
claimedCRC32 = reader.readLong();
else
claimedCRC32 = reader.readInt() & 0xffffffffL;
@@ -429,7 +428,7 @@ public class CommitLogReplayer
}
}
- if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
break;
offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
[2/5] git commit: Make commitlog archive+restore more robust patch by
bes; reviewed by jbellis for CASSANDRA-6974
Posted by jb...@apache.org.
Make commitlog archive+restore more robust
patch by bes; reviewed by jbellis for CASSANDRA-6974
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/134e0226
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/134e0226
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/134e0226
Branch: refs/heads/cassandra-2.1
Commit: 134e0226e42d977e8e73477b1ff24d51e64b4436
Parents: a89fd4d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 15:56:22 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:09 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogArchiver.java | 30 +++++++--
.../db/commitlog/CommitLogDescriptor.java | 64 ++++++++++++++++++++
.../db/commitlog/CommitLogReplayer.java | 37 +++++++----
.../db/commitlog/CommitLogSegment.java | 57 +++++++++++++----
.../db/commitlog/CommitLogSegmentManager.java | 7 +--
.../apache/cassandra/utils/PureJavaCrc32.java | 17 +++++-
8 files changed, 178 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d80937..1ebc050 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc1
+ * Make commitlog archive+restore more robust (CASSANDRA-6974)
* Fix marking commitlogsegments clean (CASSANDRA-6959)
* Add snapshot "manifest" describing files included (CASSANDRA-6326)
* Parallel streaming for sstableloader (CASSANDRA-3668)
@@ -24,6 +25,7 @@ Merged from 1.2:
* remove duplicate query for local tokens (CASSANDRA-7182)
* exit CQLSH with error status code if script fails (CASSANDRA-6344)
+
2.1.0-beta2
* Increase default CL space to 8GB (CASSANDRA-7031)
* Add range tombstones to read repair digests (CASSANDRA-6863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a230e35..eaa1b3c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -216,13 +216,13 @@ public class CommitLog implements CommitLogMBean
// checksummed length
dos.writeInt((int) size);
checksum.update(buffer, buffer.position() - 4, 4);
- buffer.putLong(checksum.getValue());
+ buffer.putInt(checksum.getCrc());
int start = buffer.position();
// checksummed mutation
Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
checksum.update(buffer, start, (int) size);
- buffer.putLong(checksum.getValue());
+ buffer.putInt(checksum.getCrc());
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 6161435..d715fcc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -103,17 +103,18 @@ public class CommitLogArchiver
}
}
- public void maybeArchive(final String path, final String name)
+ public void maybeArchive(final CommitLogSegment segment)
{
if (Strings.isNullOrEmpty(archiveCommand))
return;
- archivePending.put(name, executor.submit(new WrappedRunnable()
+ archivePending.put(segment.getName(), executor.submit(new WrappedRunnable()
{
protected void runMayThrow() throws IOException
{
- String command = archiveCommand.replace("%name", name);
- command = command.replace("%path", path);
+ segment.waitForFinalSync();
+ String command = archiveCommand.replace("%name", segment.getName());
+ command = command.replace("%path", segment.getPath());
exec(command);
}
}));
@@ -160,7 +161,26 @@ public class CommitLogArchiver
}
for (File fromFile : files)
{
- File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(CommitLogSegment.getNextId()).fileName());
+ CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+ CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
+ CommitLogDescriptor descriptor;
+ if (fromHeader == null && fromName == null)
+ throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
+ else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
+ throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
+ else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
+ else if (fromHeader != null)
+ descriptor = fromHeader;
+ else descriptor = fromName;
+
+ if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21)
+ throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion());
+
+ File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
+ if (toFile.exists())
+ throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath());
+
String command = restoreCommand.replace("%from", fromFile.getPath());
command = command.replace("%to", toFile.getPath());
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 0c8ed61..b11da94 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -20,10 +20,18 @@
*/
package org.apache.cassandra.db.commitlog;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
public class CommitLogDescriptor
{
@@ -42,6 +50,9 @@ public class CommitLogDescriptor
*/
public static final int current_version = VERSION_21;
+ // [version, id, checksum]
+ static final int HEADER_SIZE = 4 + 8 + 4;
+
private final int version;
public final long id;
@@ -56,6 +67,43 @@ public class CommitLogDescriptor
this(current_version, id);
}
+ static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+ {
+ out.putInt(0, descriptor.version);
+ out.putLong(4, descriptor.id);
+ PureJavaCrc32 crc = new PureJavaCrc32();
+ crc.updateInt(descriptor.version);
+ crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+ crc.updateInt((int) (descriptor.id >>> 32));
+ out.putInt(12, crc.getCrc());
+ }
+
+ public static CommitLogDescriptor fromHeader(File file)
+ {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
+ {
+ assert raf.getFilePointer() == 0;
+ int version = raf.readInt();
+ long id = raf.readLong();
+ int crc = raf.readInt();
+ PureJavaCrc32 checkcrc = new PureJavaCrc32();
+ checkcrc.updateInt(version);
+ checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
+ checkcrc.updateInt((int) (id >>> 32));
+ if (crc == checkcrc.getCrc())
+ return new CommitLogDescriptor(version, id);
+ return null;
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, file);
+ }
+ }
+
public static CommitLogDescriptor fromFileName(String name)
{
Matcher matcher;
@@ -102,4 +150,20 @@ public class CommitLogDescriptor
{
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
+
+ public String toString()
+ {
+ return "(" + version + "," + id + ")";
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
+ }
+
+ public boolean equals(CommitLogDescriptor that)
+ {
+ return this.version == that.version && this.id == that.id;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index fb33187..59ae4e4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -56,7 +56,7 @@ public class CommitLogReplayer
private final AtomicInteger replayedCount;
private final Map<UUID, ReplayPosition> cfPositions;
private final ReplayPosition globalPosition;
- private final Checksum checksum;
+ private final PureJavaCrc32 checksum;
private byte[] buffer;
public CommitLogReplayer()
@@ -113,22 +113,26 @@ public class CommitLogReplayer
return replayedCount.get();
}
- private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
{
if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
{
if (offset != reader.length() && offset != Integer.MAX_VALUE)
- logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+ logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
// cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
return -1;
}
reader.seek(offset);
PureJavaCrc32 crc = new PureJavaCrc32();
- crc.update((int) (segmentId & 0xFFFFFFFFL));
- crc.update((int) (segmentId >>> 32));
- crc.update((int) reader.getPosition());
+ crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+ crc.updateInt((int) (descriptor.id >>> 32));
+ crc.updateInt((int) reader.getPosition());
int end = reader.readInt();
- long filecrc = reader.readLong();
+ long filecrc;
+ if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21)
+ filecrc = reader.readLong();
+ else
+ filecrc = reader.readInt() & 0xffffffffL;
if (crc.getValue() != filecrc)
{
if (end != 0 || filecrc != 0)
@@ -150,7 +154,7 @@ public class CommitLogReplayer
if (globalPosition.segment < segmentId)
{
if (version >= CommitLogDescriptor.VERSION_21)
- return CommitLogSegment.SYNC_MARKER_SIZE;
+ return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE;
else
return 0;
}
@@ -244,7 +248,7 @@ public class CommitLogReplayer
return;
}
- int prevEnd = 0;
+ int prevEnd = CommitLogDescriptor.HEADER_SIZE;
main: while (true)
{
@@ -253,7 +257,7 @@ public class CommitLogReplayer
end = Integer.MAX_VALUE;
else
{
- do { end = readHeader(segmentId, end, reader); }
+ do { end = readSyncMarker(desc, end, reader); }
while (end < offset && end > prevEnd);
}
@@ -290,12 +294,16 @@ public class CommitLogReplayer
if (serializedSize < 10)
break main;
- long claimedSizeChecksum = reader.readLong();
+ long claimedSizeChecksum;
+ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ claimedSizeChecksum = reader.readLong();
+ else
+ claimedSizeChecksum = reader.readInt() & 0xffffffffL;
checksum.reset();
if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
checksum.update(serializedSize);
else
- FBUtilities.updateChecksumInt(checksum, serializedSize);
+ checksum.updateInt(serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
break main; // entry wasn't synced correctly/fully. that's
@@ -304,7 +312,10 @@ public class CommitLogReplayer
if (serializedSize > buffer.length)
buffer = new byte[(int) (1.2 * serializedSize)];
reader.readFully(buffer, 0, serializedSize);
- claimedCRC32 = reader.readLong();
+ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ claimedCRC32 = reader.readLong();
+ else
+ claimedCRC32 = reader.readInt() & 0xffffffffL;
}
catch (EOFException eof)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3830966..2120d3e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
@@ -59,14 +60,24 @@ public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
- private final static long idBase = System.currentTimeMillis();
+ private final static long idBase;
private final static AtomicInteger nextId = new AtomicInteger(1);
+ static
+ {
+ long maxId = Long.MIN_VALUE;
+ for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ {
+ if (CommitLogDescriptor.isValid(file.getName()))
+ maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId);
+ }
+ idBase = Math.max(System.currentTimeMillis(), maxId + 1);
+ }
- // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
- static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
+ // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
+ static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4;
- // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position])
- static final int SYNC_MARKER_SIZE = 4 + 8;
+ // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
+ static final int SYNC_MARKER_SIZE = 4 + 4;
// The OpOrder used to order appends wrt sync
private final OpOrder appendOrder = new OpOrder();
@@ -154,10 +165,13 @@ public class CommitLogSegment
fd = CLibrary.getfd(logFileAccessor.getFD());
buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
- // mark the initial header as uninitialised
- buffer.putInt(0, 0);
- buffer.putLong(4, 0);
- allocatePosition.set(SYNC_MARKER_SIZE);
+ // write the header
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ // mark the initial sync marker as uninitialised
+ buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0);
+ buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0);
+ allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE);
+ lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE;
}
catch (IOException e)
{
@@ -292,11 +306,11 @@ public class CommitLogSegment
// we don't chain the crcs here to ensure this method is idempotent if it fails
int offset = lastSyncedOffset;
final PureJavaCrc32 crc = new PureJavaCrc32();
- crc.update((int) (id & 0xFFFFFFFFL));
- crc.update((int) (id >>> 32));
- crc.update(offset);
+ crc.updateInt((int) (id & 0xFFFFFFFFL));
+ crc.updateInt((int) (id >>> 32));
+ crc.updateInt(offset);
buffer.putInt(offset, nextMarker);
- buffer.putLong(offset + 4, crc.getValue());
+ buffer.putInt(offset + 4, crc.getCrc());
// zero out the next sync marker so replayer can cleanly exit
if (nextMarker < buffer.capacity())
@@ -383,6 +397,23 @@ public class CommitLogSegment
return logFile.getName();
}
+ void waitForFinalSync()
+ {
+ while (true)
+ {
+ WaitQueue.Signal signal = syncComplete.register();
+ if (lastSyncedOffset < buffer.capacity())
+ {
+ signal.awaitUninterruptibly();
+ }
+ else
+ {
+ signal.cancel();
+ break;
+ }
+ }
+ }
+
/**
* Close the segment file.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index b0be42c..5802e8a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.*;
@@ -222,7 +220,7 @@ public class CommitLogSegmentManager
{
// Now we can run the user defined command just after switching to the new commit log.
// (Do this here instead of in the recycle call so we can get a head start on the archive.)
- CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
+ CommitLog.instance.archiver.maybeArchive(old);
// ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
old.discardUnusedTail();
@@ -314,8 +312,9 @@ public class CommitLogSegmentManager
*/
void recycleSegment(final CommitLogSegment segment)
{
+ boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName());
activeSegments.remove(segment);
- if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+ if (!archiveSuccess)
{
// if archiving (command) was not successful then leave the file alone. don't delete or recycle.
discardSegment(segment, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
index 041652f..9a1ac02 100644
--- a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
+++ b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
@@ -31,7 +31,7 @@ import java.util.zip.Checksum;
*
* @see java.util.zip.CRC32
*
- * This class is copied from hadoop-commons project.
+ * This class is copied from hadoop-commons project and retains that formatting.
* (The initial patch added PureJavaCrc32 was HADOOP-6148)
*/
public class PureJavaCrc32 implements Checksum {
@@ -49,7 +49,11 @@ public class PureJavaCrc32 implements Checksum {
return (~crc) & 0xffffffffL;
}
- @Override
+ public int getCrc() {
+ return ~crc;
+ }
+
+ @Override
public void reset() {
crc = 0xffffffff;
}
@@ -172,7 +176,14 @@ public class PureJavaCrc32 implements Checksum {
crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
}
- /*
+ final public void updateInt(int v) {
+ update((v >>> 24) & 0xFF);
+ update((v >>> 16) & 0xFF);
+ update((v >>> 8) & 0xFF);
+ update((v >>> 0) & 0xFF);
+ }
+
+ /*
* CRC-32 lookup tables generated by the polynomial 0xEDB88320.
* See also TestPureJavaCrc32.Table.
*/