You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/06/28 23:45:37 UTC

[2/2] git commit: Add Commitlog Versioning patch by Vijay; reviewed by jbellis for CASSANDRA-4357

Add Commitlog Versioning
patch by Vijay; reviewed by jbellis for CASSANDRA-4357


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57998976
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57998976
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57998976

Branch: refs/heads/trunk
Commit: 57998976f0024776bab6b2301f2436ea60e38fe0
Parents: 7cb3b73
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Jun 28 14:35:37 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Jun 28 14:37:15 2012 -0700

----------------------------------------------------------------------
 src/java/org/apache/cassandra/config/Schema.java   |    6 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |    2 +-
 .../cassandra/db/commitlog/CommitLogAllocator.java |    4 +-
 .../cassandra/db/commitlog/CommitLogArchiver.java  |    7 +-
 .../db/commitlog/CommitLogDescriptor.java          |  102 +++++++++++++++
 .../cassandra/db/commitlog/CommitLogReplayer.java  |   11 +-
 .../cassandra/db/commitlog/CommitLogSegment.java   |   42 +------
 .../org/apache/cassandra/db/CommitLogTest.java     |   24 ++++-
 8 files changed, 144 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 7d52d2e..c36ee77 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.config;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -34,6 +33,7 @@ import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.Pair;
@@ -347,12 +347,12 @@ public class Schema
         oldCfIdMap.put(oldId, newId);
     }
 
-    public UUID convertOldCfId(Integer oldCfId)
+    public UUID convertOldCfId(Integer oldCfId) throws UnknownColumnFamilyException
     {
         UUID cfId = oldCfIdMap.get(oldCfId);
 
         if (cfId == null)
-            throw new IllegalArgumentException("ColumnFamily identified by old " + oldCfId + " was not found.");
+            throw new UnknownColumnFamilyException("ColumnFamily identified by old " + oldCfId + " was not found.", null);
 
         return cfId;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6649ae4..e339cbc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -112,7 +112,7 @@ public class CommitLog implements CommitLogMBean
                 // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
                 // until after recover was finished.  this turns out to be fragile; it is less error-prone to go
                 // ahead and allow writes before recover(), and just skip active segments when we do.
-                return CommitLogSegment.possibleCommitLogFile(name) && !instance.allocator.manages(name);
+                return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name);
             }
         });
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index e402a4a..dff4093 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -178,7 +179,8 @@ public class CommitLogAllocator
     public void recycleSegment(final File file)
     {
         // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
-        if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize())
+        if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
+                || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
         {
             // (don't decrease managed size, since this was never a "live" segment)
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 556a37a..b8c8dfb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.commitlog;
  * 
  */
 
-
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,7 +30,6 @@ import java.util.Properties;
 import java.util.concurrent.*;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -148,10 +146,7 @@ public class CommitLogArchiver
             File[] files = new File(dir).listFiles();
             for (File fromFile : files)
             {
-                File toFile = new File(DatabaseDescriptor.getCommitLogLocation(),
-                                       CommitLogSegment.FILENAME_PREFIX +
-                                       System.nanoTime() +
-                                       CommitLogSegment.FILENAME_EXTENSION);             
+                File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(System.nanoTime()).fileName());            
                 String command = restoreCommand.replace("%from", fromFile.getPath());
                 command = command.replace("%to", toFile.getPath());       
                 exec(command);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
new file mode 100644
index 0000000..1d67f5c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -0,0 +1,102 @@
+package org.apache.cassandra.db.commitlog;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.cassandra.net.MessagingService;
+
+public class CommitLogDescriptor
+{
+    private static final String SEPARATOR = "-";
+    private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
+    private static final String FILENAME_EXTENSION = ".log";
+    // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. 
+    private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
+    
+    public static final int LEGACY_VERSION = 1;
+    public static final int VERSION_12 = 2;
+    /** 
+     * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
+     * Note: make sure to handle {@link #getMessagingVersion()}
+     */
+    public static final int current_version = VERSION_12;
+    
+    private final int version;
+    public final long id;
+    
+    public CommitLogDescriptor(int version, long id)
+    {
+        this.version = version;
+        this.id = id;
+    }
+
+    public CommitLogDescriptor(long id)
+    {
+        this(current_version, id);
+    }
+
+    public static CommitLogDescriptor fromFileName(String name)
+    {
+        Matcher matcher;
+        if (!(matcher = COMMIT_LOG_FILE_PATTERN.matcher(name)).matches())
+            throw new RuntimeException("Cannot parse the version of the file: " + name);
+
+        if (matcher.group(3) != null)
+        {
+            long id = Long.valueOf(matcher.group(3).split(SEPARATOR)[1]);
+            return new CommitLogDescriptor(Integer.valueOf(matcher.group(2)), id);
+        }
+        else
+        {
+            long id = Long.valueOf(matcher.group(1));
+            return new CommitLogDescriptor(LEGACY_VERSION, id);
+        }
+    }
+
+    public int getMessagingVersion()
+    {
+        switch (version)
+        {
+            case LEGACY_VERSION:
+                return MessagingService.VERSION_11;
+            case VERSION_12:
+                return MessagingService.VERSION_12;
+            default:
+                throw new IllegalStateException("Unknown commitlog version " + version);
+        }
+    }
+
+    public String fileName()
+    {   
+        return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
+    }
+
+    /**
+     * @param   filename  the filename to check
+     * @return true if filename could be a commit log based on it's filename
+     */
+    public static boolean isValid(String filename)
+    {
+        return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 5dc29ac..6a20027 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.PureJavaCrc32;
@@ -59,7 +58,7 @@ public class CommitLogReplayer
     private final Set<Table> tablesRecovered;
     private final List<Future<?>> futures;
     private final Map<UUID, AtomicInteger> invalidMutations;
-private final AtomicInteger replayedCount;
+    private final AtomicInteger replayedCount;
     private final Map<UUID, ReplayPosition> cfPositions;
     private final ReplayPosition globalPosition;
     private final Checksum checksum;
@@ -113,7 +112,9 @@ private final AtomicInteger replayedCount;
     public void recover(File file) throws IOException
     {
         logger.info("Replaying " + file.getPath());
-        final long segment = CommitLogSegment.idFromFilename(file.getName());
+        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+        final long segment = desc.id;
+        int version = desc.getMessagingVersion();
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
         assert reader.length() <= Integer.MAX_VALUE;
         try
@@ -195,10 +196,12 @@ private final AtomicInteger replayedCount;
                 {
                     // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
                     // the current version. so do make sure the CL is drained prior to upgrading a node.
-                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), MessagingService.current_version, IColumnSerializer.Flag.LOCAL);
+                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL);
                 }
                 catch (UnknownColumnFamilyException ex)
                 {
+                    if (ex.cfId == null)
+                        continue;
                     AtomicInteger i = invalidMutations.get(ex.cfId);
                     if (i == null)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 0f3cb06..bf67095 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -25,8 +25,6 @@ import java.nio.channels.FileChannel;
 import java.nio.MappedByteBuffer;
 import java.util.Collection;
 import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.zip.Checksum;
 import java.util.HashMap;
 
@@ -51,10 +49,6 @@ public class CommitLogSegment
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
 
-    static final String FILENAME_PREFIX = "CommitLog-";
-    static final String FILENAME_EXTENSION = ".log";
-    private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
-
     // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
     static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
 
@@ -71,6 +65,8 @@ public class CommitLogSegment
     private final MappedByteBuffer buffer;
     private boolean closed;
 
+    public final CommitLogDescriptor descriptor;
+
     /**
      * @return a newly minted segment file
      */
@@ -87,7 +83,8 @@ public class CommitLogSegment
     CommitLogSegment(String filePath)
     {
         id = System.nanoTime();
-        logFile = new File(DatabaseDescriptor.getCommitLogLocation(), FILENAME_PREFIX + id + FILENAME_EXTENSION);
+        descriptor = new CommitLogDescriptor(id);
+        logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
         boolean isCreating = true;
 
         try
@@ -127,37 +124,6 @@ public class CommitLogSegment
     }
 
     /**
-     * Extracts the commit log ID from filename
-     *
-     * @param   filename  the filename of the commit log file
-     * @return the extracted commit log ID
-     */
-    public static long idFromFilename(String filename)
-    {
-        Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
-        try
-        {
-            if (matcher.matches())
-                return Long.valueOf(matcher.group(1));
-            else
-                return -1L;
-        }
-        catch (NumberFormatException e)
-        {
-            return -1L;
-        }
-    }
-
-    /**
-     * @param   filename  the filename to check
-     * @return true if filename could be a commit log based on it's filename
-     */
-    public static boolean possibleCommitLogFile(String filename)
-    {
-        return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
-    }
-
-    /**
      * Completely discards a segment file by deleting it. (Potentially blocking operation)
      */
     public void discard(boolean deleteFile)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 5c52d07..983ba10 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -25,11 +25,15 @@ import java.util.UUID;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import junit.framework.Assert;
+
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.net.MessagingService;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -190,7 +194,7 @@ public class CommitLogTest extends SchemaLoader
 
     protected File tmpFile() throws IOException
     {
-        File logFile = File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
+        File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log");
         logFile.deleteOnExit();
         assert logFile.length() == 0;
         return logFile;
@@ -204,4 +208,22 @@ public class CommitLogTest extends SchemaLoader
         //statics make it annoying to test things correctly
         CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
     }
+
+    @Test
+    public void testVersions()
+    {
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").id);
+
+        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L).getMessagingVersion());
+        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+        Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+        Assert.assertEquals(MessagingService.VERSION_11, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").getMessagingVersion());
+    }
 }