You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/18 23:38:27 UTC
[1/2] git commit: Fix replaying pre-2.0 commit logs
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 add73562c -> c2681e933
Fix replaying pre-2.0 commit logs
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6714
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad3a4f82
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad3a4f82
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad3a4f82
Branch: refs/heads/cassandra-2.1
Commit: ad3a4f824e6d0f1484883f78498e3a07592259b3
Parents: 6a34b56
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 19 01:21:13 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 19 01:22:59 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogDescriptor.java | 6 +++++-
.../cassandra/db/commitlog/CommitLogReplayer.java | 15 ++++++++-------
3 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad3a4f82/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5da346..95faf18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
* Improve support for paginating over composites (CASSANDRA-4851)
* Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
* Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
+ * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
Merged from 1.2:
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
* Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad3a4f82/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 87de7d8..7488c20 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -70,7 +70,6 @@ public class CommitLogDescriptor
public int getMessagingVersion()
{
- assert MessagingService.current_version == MessagingService.VERSION_20;
switch (version)
{
case VERSION_12:
@@ -82,6 +81,11 @@ public class CommitLogDescriptor
}
}
+ public int getVersion()
+ {
+ return version;
+ }
+
public String fileName()
{
return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad3a4f82/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 44713ad..56cd4ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -179,11 +179,12 @@ public class CommitLogReplayer
public void recover(File file) throws IOException
{
final ReplayFilter replayFilter = ReplayFilter.create();
-
- logger.info("Replaying " + file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
final long segment = desc.id;
- int version = desc.getMessagingVersion();
+ logger.info("Replaying {} (CL version {}, messaging version {})",
+ file.getPath(),
+ desc.getVersion(),
+ desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
try
{
@@ -234,7 +235,7 @@ public class CommitLogReplayer
long claimedSizeChecksum = reader.readLong();
checksum.reset();
- if (version < CommitLogDescriptor.VERSION_20)
+ if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
checksum.update(serializedSize);
else
FBUtilities.updateChecksumInt(checksum, serializedSize);
@@ -266,9 +267,9 @@ public class CommitLogReplayer
final RowMutation rm;
try
{
- // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
- // the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+ rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn),
+ desc.getMessagingVersion(),
+ ColumnSerializer.Flag.LOCAL);
// doublecheck that what we read is [still] valid for the current schema
for (ColumnFamily cf : rm.getColumnFamilies())
for (Column cell : cf)
[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2681e93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2681e93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2681e93
Branch: refs/heads/cassandra-2.1
Commit: c2681e9336a92fc4e562f2b316b6a62cb5c62e98
Parents: add7356 ad3a4f8
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 19 01:37:58 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 19 01:37:58 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/CommitLogDescriptor.java | 6 +++++-
.../db/commitlog/CommitLogReplayer.java | 19 +++++++++++--------
3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2681e93/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index df28b8e,56cd4ff..3bc0f87
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -227,201 -179,187 +227,204 @@@ public class CommitLogReplaye
public void recover(File file) throws IOException
{
final ReplayFilter replayFilter = ReplayFilter.create();
+ logger.info("Replaying {}", file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- final long segment = desc.id;
+ final long segmentId = desc.id;
- int version = desc.getMessagingVersion();
+ logger.info("Replaying {} (CL version {}, messaging version {})",
+ file.getPath(),
+ desc.getVersion(),
+ desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+
try
{
assert reader.length() <= Integer.MAX_VALUE;
- int offset = getStartOffset(segmentId, version);
- int replayPosition;
- if (globalPosition.segment < segment)
- {
- replayPosition = 0;
- }
- else if (globalPosition.segment == segment)
- {
- replayPosition = globalPosition.position;
- }
- else
++ int offset = getStartOffset(segmentId, desc.getMessagingVersion());
+ if (offset < 0)
{
logger.debug("skipping replay of fully-flushed {}", file);
return;
}
- if (logger.isDebugEnabled())
- logger.debug("Replaying " + file + " starting at " + replayPosition);
- reader.seek(replayPosition);
-
- /* read the logs populate RowMutation and apply */
- while (!reader.isEOF())
+ int prevEnd = 0;
+ main: while (true)
{
- if (logger.isDebugEnabled())
- logger.debug("Reading mutation at " + reader.getFilePointer());
- long claimedCRC32;
- int serializedSize;
- try
+ int end = prevEnd;
- if (version < CommitLogDescriptor.VERSION_21)
++ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ end = Integer.MAX_VALUE;
+ else
{
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
- {
- logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
- break;
- }
-
- // RowMutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- break;
-
- long claimedSizeChecksum = reader.readLong();
- checksum.reset();
- if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- FBUtilities.updateChecksumInt(checksum, serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- break; // entry wasn't synced correctly/fully. that's
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- claimedCRC32 = reader.readLong();
- }
- catch (EOFException eof)
- {
- break; // last CL entry didn't get completely written. that's ok.
+ do { end = readHeader(segmentId, end, reader); }
+ while (end < offset && end > prevEnd);
}
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- // this entry must not have been fsynced. probably the rest is bad too,
- // but just in case there is no harm in trying them (since we still read on an entry boundary)
- continue;
- }
+ if (end < prevEnd)
+ break;
- /* deserialize the commit log entry */
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final RowMutation rm;
- try
- {
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn),
- desc.getMessagingVersion(),
- ColumnSerializer.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : rm.getColumnFamilies())
- for (Column cell : cf)
- cf.getComparator().validate(cell.name());
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- continue;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- continue;
- }
- catch (Throwable t)
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying {} between {} and {}", file, offset, end);
+
+ reader.seek(offset);
+
+ /* read the logs populate Mutation and apply */
+ while (reader.getPosition() < end && !reader.isEOF())
{
- File f = File.createTempFile("mutation", "dat");
- DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at {}", reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
try
{
- out.write(buffer, 0, serializedSize);
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+ break main;
+ }
+
+ // Mutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Keyspace and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ break main;
+
+ long claimedSizeChecksum = reader.readLong();
+ checksum.reset();
- if (version < CommitLogDescriptor.VERSION_20)
++ if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
+ checksum.update(serializedSize);
+ else
+ FBUtilities.updateChecksumInt(checksum, serializedSize);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ break main; // entry wasn't synced correctly/fully. that's
+ // ok.
+
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ claimedCRC32 = reader.readLong();
}
- finally
+ catch (EOFException eof)
{
- out.close();
+ break main; // last CL entry didn't get completely written. that's ok.
}
- String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
- f.getAbsolutePath());
- logger.error(st, t);
- continue;
- }
- if (logger.isDebugEnabled())
- logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
- + "}"));
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
+ /* deserialize the commit log entry */
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+ final Mutation mutation;
+ try
{
- // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
- // the current version. so do make sure the CL is drained prior to upgrading a node.
- mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
- if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(rm))
- return;
-
- final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
-
- // Rebuild the row mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- RowMutation newRm = null;
- for (ColumnFamily columnFamily : replayFilter.filter(rm))
++ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
++ desc.getMessagingVersion(),
++ ColumnSerializer.Flag.LOCAL);
+ // doublecheck that what we read is [still] valid for the current schema
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ for (Cell cell : cf)
+ cf.getComparator().validate(cell.name());
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ continue;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
{
- if (Schema.instance.getCF(columnFamily.id()) == null)
- continue; // dropped
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ continue;
+ }
+ catch (Throwable t)
+ {
+ File f = File.createTempFile("mutation", "dat");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
+ {
+ out.write(buffer, 0, serializedSize);
+ }
+ finally
+ {
+ out.close();
+ }
+ String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
+ f.getAbsolutePath());
+ logger.error(st, t);
+ continue;
+ }
- ReplayPosition rp = cfPositions.get(columnFamily.id());
+ if (logger.isDebugEnabled())
+ logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+ final long entryLocation = reader.getFilePointer();
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(mutation))
{
- if (newRm == null)
- newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
- newRm.add(columnFamily);
- replayedCount.incrementAndGet();
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ continue; // dropped
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+ keyspacesRecovered.add(keyspace);
}
}
- if (newRm != null)
- {
- assert !newRm.isEmpty();
- Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
- keyspacesRecovered.add(keyspace);
- }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
}
+
- if (version < CommitLogDescriptor.VERSION_21)
++ if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+ break;
+
+ offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
+ prevEnd = end;
}
}
finally