You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/28 23:45:37 UTC
[2/2] git commit: Add Commitlog Versioning patch by Vijay;
reviewed by jbellis for CASSANDRA-4357
Add Commitlog Versioning
patch by Vijay; reviewed by jbellis for CASSANDRA-4357
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57998976
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57998976
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57998976
Branch: refs/heads/trunk
Commit: 57998976f0024776bab6b2301f2436ea60e38fe0
Parents: 7cb3b73
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 28 14:35:37 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 28 14:37:15 2012 -0700
----------------------------------------------------------------------
src/java/org/apache/cassandra/config/Schema.java | 6 +-
.../apache/cassandra/db/commitlog/CommitLog.java | 2 +-
.../cassandra/db/commitlog/CommitLogAllocator.java | 4 +-
.../cassandra/db/commitlog/CommitLogArchiver.java | 7 +-
.../db/commitlog/CommitLogDescriptor.java | 102 +++++++++++++++
.../cassandra/db/commitlog/CommitLogReplayer.java | 11 +-
.../cassandra/db/commitlog/CommitLogSegment.java | 42 +------
.../org/apache/cassandra/db/CommitLogTest.java | 24 ++++-
8 files changed, 144 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 7d52d2e..c36ee77 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.config;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,6 +33,7 @@ import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.Pair;
@@ -347,12 +347,12 @@ public class Schema
oldCfIdMap.put(oldId, newId);
}
- public UUID convertOldCfId(Integer oldCfId)
+ public UUID convertOldCfId(Integer oldCfId) throws UnknownColumnFamilyException
{
UUID cfId = oldCfIdMap.get(oldCfId);
if (cfId == null)
- throw new IllegalArgumentException("ColumnFamily identified by old " + oldCfId + " was not found.");
+ throw new UnknownColumnFamilyException("ColumnFamily identified by old " + oldCfId + " was not found.", null);
return cfId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6649ae4..e339cbc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -112,7 +112,7 @@ public class CommitLog implements CommitLogMBean
// we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
// until after recover was finished. this turns out to be fragile; it is less error-prone to go
// ahead and allow writes before recover(), and just skip active segments when we do.
- return CommitLogSegment.possibleCommitLogFile(name) && !instance.allocator.manages(name);
+ return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name);
}
});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index e402a4a..dff4093 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -178,7 +179,8 @@ public class CommitLogAllocator
public void recycleSegment(final File file)
{
// check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
- if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize())
+ if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
+ || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
{
// (don't decrease managed size, since this was never a "live" segment)
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 556a37a..b8c8dfb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.commitlog;
*
*/
-
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -31,7 +30,6 @@ import java.util.Properties;
import java.util.concurrent.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
@@ -148,10 +146,7 @@ public class CommitLogArchiver
File[] files = new File(dir).listFiles();
for (File fromFile : files)
{
- File toFile = new File(DatabaseDescriptor.getCommitLogLocation(),
- CommitLogSegment.FILENAME_PREFIX +
- System.nanoTime() +
- CommitLogSegment.FILENAME_EXTENSION);
+ File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(System.nanoTime()).fileName());
String command = restoreCommand.replace("%from", fromFile.getPath());
command = command.replace("%to", toFile.getPath());
exec(command);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
new file mode 100644
index 0000000..1d67f5c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -0,0 +1,102 @@
+package org.apache.cassandra.db.commitlog;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.cassandra.net.MessagingService;
+
+public class CommitLogDescriptor
+{
+ private static final String SEPARATOR = "-";
+ private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
+ private static final String FILENAME_EXTENSION = ".log";
+ // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
+ private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
+
+ public static final int LEGACY_VERSION = 1;
+ public static final int VERSION_12 = 2;
+ /**
+ * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
+ * Note: make sure to handle {@link #getMessagingVersion()}
+ */
+ public static final int current_version = VERSION_12;
+
+ private final int version;
+ public final long id;
+
+ public CommitLogDescriptor(int version, long id)
+ {
+ this.version = version;
+ this.id = id;
+ }
+
+ public CommitLogDescriptor(long id)
+ {
+ this(current_version, id);
+ }
+
+ public static CommitLogDescriptor fromFileName(String name)
+ {
+ Matcher matcher;
+ if (!(matcher = COMMIT_LOG_FILE_PATTERN.matcher(name)).matches())
+ throw new RuntimeException("Cannot parse the version of the file: " + name);
+
+ if (matcher.group(3) != null)
+ {
+ long id = Long.valueOf(matcher.group(3).split(SEPARATOR)[1]);
+ return new CommitLogDescriptor(Integer.valueOf(matcher.group(2)), id);
+ }
+ else
+ {
+ long id = Long.valueOf(matcher.group(1));
+ return new CommitLogDescriptor(LEGACY_VERSION, id);
+ }
+ }
+
+ public int getMessagingVersion()
+ {
+ switch (version)
+ {
+ case LEGACY_VERSION:
+ return MessagingService.VERSION_11;
+ case VERSION_12:
+ return MessagingService.VERSION_12;
+ default:
+ throw new IllegalStateException("Unknown commitlog version " + version);
+ }
+ }
+
+ public String fileName()
+ {
+ return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
+ }
+
+ /**
+ * @param filename the filename to check
+ * @return true if filename could be a commit log based on it's filename
+ */
+ public static boolean isValid(String filename)
+ {
+ return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 5dc29ac..6a20027 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.PureJavaCrc32;
@@ -59,7 +58,7 @@ public class CommitLogReplayer
private final Set<Table> tablesRecovered;
private final List<Future<?>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
-private final AtomicInteger replayedCount;
+ private final AtomicInteger replayedCount;
private final Map<UUID, ReplayPosition> cfPositions;
private final ReplayPosition globalPosition;
private final Checksum checksum;
@@ -113,7 +112,9 @@ private final AtomicInteger replayedCount;
public void recover(File file) throws IOException
{
logger.info("Replaying " + file.getPath());
- final long segment = CommitLogSegment.idFromFilename(file.getName());
+ CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+ final long segment = desc.id;
+ int version = desc.getMessagingVersion();
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
assert reader.length() <= Integer.MAX_VALUE;
try
@@ -195,10 +196,12 @@ private final AtomicInteger replayedCount;
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), MessagingService.current_version, IColumnSerializer.Flag.LOCAL);
+ rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL);
}
catch (UnknownColumnFamilyException ex)
{
+ if (ex.cfId == null)
+ continue;
AtomicInteger i = invalidMutations.get(ex.cfId);
if (i == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 0f3cb06..bf67095 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -25,8 +25,6 @@ import java.nio.channels.FileChannel;
import java.nio.MappedByteBuffer;
import java.util.Collection;
import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.zip.Checksum;
import java.util.HashMap;
@@ -51,10 +49,6 @@ public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
- static final String FILENAME_PREFIX = "CommitLog-";
- static final String FILENAME_EXTENSION = ".log";
- private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
-
// The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
@@ -71,6 +65,8 @@ public class CommitLogSegment
private final MappedByteBuffer buffer;
private boolean closed;
+ public final CommitLogDescriptor descriptor;
+
/**
* @return a newly minted segment file
*/
@@ -87,7 +83,8 @@ public class CommitLogSegment
CommitLogSegment(String filePath)
{
id = System.nanoTime();
- logFile = new File(DatabaseDescriptor.getCommitLogLocation(), FILENAME_PREFIX + id + FILENAME_EXTENSION);
+ descriptor = new CommitLogDescriptor(id);
+ logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
boolean isCreating = true;
try
@@ -127,37 +124,6 @@ public class CommitLogSegment
}
/**
- * Extracts the commit log ID from filename
- *
- * @param filename the filename of the commit log file
- * @return the extracted commit log ID
- */
- public static long idFromFilename(String filename)
- {
- Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
- try
- {
- if (matcher.matches())
- return Long.valueOf(matcher.group(1));
- else
- return -1L;
- }
- catch (NumberFormatException e)
- {
- return -1L;
- }
- }
-
- /**
- * @param filename the filename to check
- * @return true if filename could be a commit log based on it's filename
- */
- public static boolean possibleCommitLogFile(String filename)
- {
- return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
- }
-
- /**
* Completely discards a segment file by deleting it. (Potentially blocking operation)
*/
public void discard(boolean deleteFile)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 5c52d07..983ba10 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -25,11 +25,15 @@ import java.util.UUID;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import junit.framework.Assert;
+
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.net.MessagingService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -190,7 +194,7 @@ public class CommitLogTest extends SchemaLoader
protected File tmpFile() throws IOException
{
- File logFile = File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
+ File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log");
logFile.deleteOnExit();
assert logFile.length() == 0;
return logFile;
@@ -204,4 +208,22 @@ public class CommitLogTest extends SchemaLoader
//statics make it annoying to test things correctly
CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
}
+
+ @Test
+ public void testVersions()
+ {
+ Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+ Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+ Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+ Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+ Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").id);
+
+ Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L).getMessagingVersion());
+ String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+ Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+ Assert.assertEquals(MessagingService.VERSION_11, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").getMessagingVersion());
+ }
}