You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/05/08 23:00:37 UTC

[4/5] git commit: Make commitlog archive+restore more robust patch by bes; reviewed by jbellis for CASSANDRA-6974

Make commitlog archive+restore more robust
patch by bes; reviewed by jbellis for CASSANDRA-6974


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/134e0226
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/134e0226
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/134e0226

Branch: refs/heads/trunk
Commit: 134e0226e42d977e8e73477b1ff24d51e64b4436
Parents: a89fd4d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 8 15:56:22 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 8 16:00:09 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 .../db/commitlog/CommitLogArchiver.java         | 30 +++++++--
 .../db/commitlog/CommitLogDescriptor.java       | 64 ++++++++++++++++++++
 .../db/commitlog/CommitLogReplayer.java         | 37 +++++++----
 .../db/commitlog/CommitLogSegment.java          | 57 +++++++++++++----
 .../db/commitlog/CommitLogSegmentManager.java   |  7 +--
 .../apache/cassandra/utils/PureJavaCrc32.java   | 17 +++++-
 8 files changed, 178 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d80937..1ebc050 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc1
+ * Make commitlog archive+restore more robust (CASSANDRA-6974)
  * Fix marking commitlogsegments clean (CASSANDRA-6959)
  * Add snapshot "manifest" describing files included (CASSANDRA-6326)
  * Parallel streaming for sstableloader (CASSANDRA-3668)
@@ -24,6 +25,7 @@ Merged from 1.2:
  * remove duplicate query for local tokens (CASSANDRA-7182)
  * exit CQLSH with error status code if script fails (CASSANDRA-6344)
 
+
 2.1.0-beta2
  * Increase default CL space to 8GB (CASSANDRA-7031)
  * Add range tombstones to read repair digests (CASSANDRA-6863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a230e35..eaa1b3c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -216,13 +216,13 @@ public class CommitLog implements CommitLogMBean
             // checksummed length
             dos.writeInt((int) size);
             checksum.update(buffer, buffer.position() - 4, 4);
-            buffer.putLong(checksum.getValue());
+            buffer.putInt(checksum.getCrc());
 
             int start = buffer.position();
             // checksummed mutation
             Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
             checksum.update(buffer, start, (int) size);
-            buffer.putLong(checksum.getValue());
+            buffer.putInt(checksum.getCrc());
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 6161435..d715fcc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -103,17 +103,18 @@ public class CommitLogArchiver
         }
     }
 
-    public void maybeArchive(final String path, final String name)
+    public void maybeArchive(final CommitLogSegment segment)
     {
         if (Strings.isNullOrEmpty(archiveCommand))
             return;
 
-        archivePending.put(name, executor.submit(new WrappedRunnable()
+        archivePending.put(segment.getName(), executor.submit(new WrappedRunnable()
         {
             protected void runMayThrow() throws IOException
             {
-                String command = archiveCommand.replace("%name", name);
-                command = command.replace("%path", path);
+                segment.waitForFinalSync();
+                String command = archiveCommand.replace("%name", segment.getName());
+                command = command.replace("%path", segment.getPath());
                 exec(command);
             }
         }));
@@ -160,7 +161,26 @@ public class CommitLogArchiver
             }
             for (File fromFile : files)
             {
-                File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(CommitLogSegment.getNextId()).fileName());
+                CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+                CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
+                CommitLogDescriptor descriptor;
+                if (fromHeader == null && fromName == null)
+                    throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
+                else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
+                    throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
+                else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21)
+                    throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
+                else if (fromHeader != null)
+                    descriptor = fromHeader;
+                else descriptor = fromName;
+
+                if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21)
+                    throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion());
+
+                File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
+                if (toFile.exists())
+                    throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath());
+
                 String command = restoreCommand.replace("%from", fromFile.getPath());
                 command = command.replace("%to", toFile.getPath());
                 try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 0c8ed61..b11da94 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -20,10 +20,18 @@
  */
 package org.apache.cassandra.db.commitlog;
 
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
 
 public class CommitLogDescriptor
 {
@@ -42,6 +50,9 @@ public class CommitLogDescriptor
      */
     public static final int current_version = VERSION_21;
 
+    // [version, id, checksum]
+    static final int HEADER_SIZE = 4 + 8 + 4;
+
     private final int version;
     public final long id;
 
@@ -56,6 +67,43 @@ public class CommitLogDescriptor
         this(current_version, id);
     }
 
+    static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+    {
+        out.putInt(0, descriptor.version);
+        out.putLong(4, descriptor.id);
+        PureJavaCrc32 crc = new PureJavaCrc32();
+        crc.updateInt(descriptor.version);
+        crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+        crc.updateInt((int) (descriptor.id >>> 32));
+        out.putInt(12, crc.getCrc());
+    }
+
+    public static CommitLogDescriptor fromHeader(File file)
+    {
+        try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
+        {
+            assert raf.getFilePointer() == 0;
+            int version = raf.readInt();
+            long id = raf.readLong();
+            int crc = raf.readInt();
+            PureJavaCrc32 checkcrc = new PureJavaCrc32();
+            checkcrc.updateInt(version);
+            checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
+            checkcrc.updateInt((int) (id >>> 32));
+            if (crc == checkcrc.getCrc())
+                return new CommitLogDescriptor(version, id);
+            return null;
+        }
+        catch (EOFException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, file);
+        }
+    }
+
     public static CommitLogDescriptor fromFileName(String name)
     {
         Matcher matcher;
@@ -102,4 +150,20 @@ public class CommitLogDescriptor
     {
         return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
+
+    public String toString()
+    {
+        return "(" + version + "," + id + ")";
+    }
+
+    public boolean equals(Object that)
+    {
+        return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
+    }
+
+    public boolean equals(CommitLogDescriptor that)
+    {
+        return this.version == that.version && this.id == that.id;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index fb33187..59ae4e4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -56,7 +56,7 @@ public class CommitLogReplayer
     private final AtomicInteger replayedCount;
     private final Map<UUID, ReplayPosition> cfPositions;
     private final ReplayPosition globalPosition;
-    private final Checksum checksum;
+    private final PureJavaCrc32 checksum;
     private byte[] buffer;
 
     public CommitLogReplayer()
@@ -113,22 +113,26 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
-    private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
     {
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
             if (offset != reader.length() && offset != Integer.MAX_VALUE)
-                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
             // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
             return -1;
         }
         reader.seek(offset);
         PureJavaCrc32 crc = new PureJavaCrc32();
-        crc.update((int) (segmentId & 0xFFFFFFFFL));
-        crc.update((int) (segmentId >>> 32));
-        crc.update((int) reader.getPosition());
+        crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+        crc.updateInt((int) (descriptor.id >>> 32));
+        crc.updateInt((int) reader.getPosition());
         int end = reader.readInt();
-        long filecrc = reader.readLong();
+        long filecrc;
+        if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21)
+            filecrc = reader.readLong();
+        else
+            filecrc = reader.readInt() & 0xffffffffL;
         if (crc.getValue() != filecrc)
         {
             if (end != 0 || filecrc != 0)
@@ -150,7 +154,7 @@ public class CommitLogReplayer
         if (globalPosition.segment < segmentId)
         {
             if (version >= CommitLogDescriptor.VERSION_21)
-                return CommitLogSegment.SYNC_MARKER_SIZE;
+                return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE;
             else
                 return 0;
         }
@@ -244,7 +248,7 @@ public class CommitLogReplayer
                 return;
             }
 
-            int prevEnd = 0;
+            int prevEnd = CommitLogDescriptor.HEADER_SIZE;
             main: while (true)
             {
 
@@ -253,7 +257,7 @@ public class CommitLogReplayer
                     end = Integer.MAX_VALUE;
                 else
                 {
-                    do { end = readHeader(segmentId, end, reader); }
+                    do { end = readSyncMarker(desc, end, reader); }
                     while (end < offset && end > prevEnd);
                 }
 
@@ -290,12 +294,16 @@ public class CommitLogReplayer
                         if (serializedSize < 10)
                             break main;
 
-                        long claimedSizeChecksum = reader.readLong();
+                        long claimedSizeChecksum;
+                        if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+                            claimedSizeChecksum = reader.readLong();
+                        else
+                            claimedSizeChecksum = reader.readInt() & 0xffffffffL;
                         checksum.reset();
                         if (desc.getVersion() < CommitLogDescriptor.VERSION_20)
                             checksum.update(serializedSize);
                         else
-                            FBUtilities.updateChecksumInt(checksum, serializedSize);
+                            checksum.updateInt(serializedSize);
 
                         if (checksum.getValue() != claimedSizeChecksum)
                             break main; // entry wasn't synced correctly/fully. that's
@@ -304,7 +312,10 @@ public class CommitLogReplayer
                         if (serializedSize > buffer.length)
                             buffer = new byte[(int) (1.2 * serializedSize)];
                         reader.readFully(buffer, 0, serializedSize);
-                        claimedCRC32 = reader.readLong();
+                        if (desc.getVersion() < CommitLogDescriptor.VERSION_21)
+                            claimedCRC32 = reader.readLong();
+                        else
+                            claimedCRC32 = reader.readInt() & 0xffffffffL;
                     }
                     catch (EOFException eof)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3830966..2120d3e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
@@ -59,14 +60,24 @@ public class CommitLogSegment
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
 
-    private final static long idBase = System.currentTimeMillis();
+    private final static long idBase;
     private final static AtomicInteger nextId = new AtomicInteger(1);
+    static
+    {
+        long maxId = Long.MIN_VALUE;
+        for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+        {
+            if (CommitLogDescriptor.isValid(file.getName()))
+                maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId);
+        }
+        idBase = Math.max(System.currentTimeMillis(), maxId + 1);
+    }
 
-    // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
-    static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
+    // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
+    static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4;
 
-    // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position])
-    static final int SYNC_MARKER_SIZE = 4 + 8;
+    // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
+    static final int SYNC_MARKER_SIZE = 4 + 4;
 
     // The OpOrder used to order appends wrt sync
     private final OpOrder appendOrder = new OpOrder();
@@ -154,10 +165,13 @@ public class CommitLogSegment
             fd = CLibrary.getfd(logFileAccessor.getFD());
 
             buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
-            // mark the initial header as uninitialised
-            buffer.putInt(0, 0);
-            buffer.putLong(4, 0);
-            allocatePosition.set(SYNC_MARKER_SIZE);
+            // write the header
+            CommitLogDescriptor.writeHeader(buffer, descriptor);
+            // mark the initial sync marker as uninitialised
+            buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0);
+            buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0);
+            allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE);
+            lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE;
         }
         catch (IOException e)
         {
@@ -292,11 +306,11 @@ public class CommitLogSegment
             // we don't chain the crcs here to ensure this method is idempotent if it fails
             int offset = lastSyncedOffset;
             final PureJavaCrc32 crc = new PureJavaCrc32();
-            crc.update((int) (id & 0xFFFFFFFFL));
-            crc.update((int) (id >>> 32));
-            crc.update(offset);
+            crc.updateInt((int) (id & 0xFFFFFFFFL));
+            crc.updateInt((int) (id >>> 32));
+            crc.updateInt(offset);
             buffer.putInt(offset, nextMarker);
-            buffer.putLong(offset + 4, crc.getValue());
+            buffer.putInt(offset + 4, crc.getCrc());
 
             // zero out the next sync marker so replayer can cleanly exit
             if (nextMarker < buffer.capacity())
@@ -383,6 +397,23 @@ public class CommitLogSegment
         return logFile.getName();
     }
 
+    void waitForFinalSync()
+    {
+        while (true)
+        {
+            WaitQueue.Signal signal = syncComplete.register();
+            if (lastSyncedOffset < buffer.capacity())
+            {
+                signal.awaitUninterruptibly();
+            }
+            else
+            {
+                signal.cancel();
+                break;
+            }
+        }
+    }
+
     /**
      * Close the segment file.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index b0be42c..5802e8a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.*;
@@ -222,7 +220,7 @@ public class CommitLogSegmentManager
                 {
                     // Now we can run the user defined command just after switching to the new commit log.
                     // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                    CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
+                    CommitLog.instance.archiver.maybeArchive(old);
 
                     // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
                     old.discardUnusedTail();
@@ -314,8 +312,9 @@ public class CommitLogSegmentManager
      */
     void recycleSegment(final CommitLogSegment segment)
     {
+        boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName());
         activeSegments.remove(segment);
-        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+        if (!archiveSuccess)
         {
             // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
             discardSegment(segment, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
index 041652f..9a1ac02 100644
--- a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
+++ b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java
@@ -31,7 +31,7 @@ import java.util.zip.Checksum;
  *
  * @see java.util.zip.CRC32
  *
- * This class is copied from hadoop-commons project.
+ * This class is copied from hadoop-commons project and retains that formatting.
  * (The initial patch added PureJavaCrc32 was HADOOP-6148)
  */
 public class PureJavaCrc32 implements Checksum {
@@ -49,7 +49,11 @@ public class PureJavaCrc32 implements Checksum {
     return (~crc) & 0xffffffffL;
   }
 
-  @Override
+  public int getCrc() {
+    return ~crc;
+  }
+
+    @Override
   public void reset() {
     crc = 0xffffffff;
   }
@@ -172,7 +176,14 @@ public class PureJavaCrc32 implements Checksum {
     crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
   }
 
-  /*
+  final public void updateInt(int v) {
+      update((v >>> 24) & 0xFF);
+      update((v >>> 16) & 0xFF);
+      update((v >>> 8) & 0xFF);
+      update((v >>> 0) & 0xFF);
+  }
+
+    /*
    * CRC-32 lookup tables generated by the polynomial 0xEDB88320.
    * See also TestPureJavaCrc32.Table.
    */