You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/09 18:30:20 UTC

[2/3] git commit: add commitlog archiving and pitr patch by Vijay and jbellis for CASSANDRA-3690

add commitlog archiving and pitr
patch by Vijay and jbellis for CASSANDRA-3690


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5923d329
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5923d329
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5923d329

Branch: refs/heads/trunk
Commit: 5923d32959ff419821dbb7fb36114a0604324498
Parents: 044e17a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Apr 9 10:01:50 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Apr 9 11:23:02 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 conf/cassandra.yaml                                |    8 +
 conf/commitlog_archiving.properties                |   37 ++
 src/java/org/apache/cassandra/config/Config.java   |    1 +
 .../cassandra/config/DatabaseDescriptor.java       |    8 +
 .../apache/cassandra/db/commitlog/CommitLog.java   |  253 ++------------
 .../cassandra/db/commitlog/CommitLogAllocator.java |   20 +-
 .../cassandra/db/commitlog/CommitLogArchiver.java  |  147 ++++++++
 .../cassandra/db/commitlog/CommitLogMBean.java     |   17 +
 .../cassandra/db/commitlog/CommitLogReplayer.java  |  269 +++++++++++++++
 .../cassandra/db/commitlog/CommitLogSegment.java   |   16 +-
 src/java/org/apache/cassandra/utils/CLibrary.java  |   33 +--
 .../org/apache/cassandra/utils/FBUtilities.java    |   34 ++-
 13 files changed, 581 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e47bb95..b80368a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.1.1-dev
+ * add support for commitlog archiving and point-in-time recovery
+   (CASSANDRA-3647)
  * update caches to use byte[] keys to reduce memory overhead (CASSANDRA-3966)
  * add column limit to cli (CASSANDRA-3012, 4098)
  * clean up and optimize DataOutputBuffer, used by CQL compression and

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index af6072a..611c7a4 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -150,6 +150,14 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
+# Configure  the Size of the individual Commitlog file. The 
+# default is 128 MB, which is almost always fine, but if you are
+# archiving commitlog segments (see commitlog_archiving.properties),
+# then you probably want a finer granularity of archiving; 16 MB
+# is reasonable.
+#
+# commitlog_segment_size_in_mb: 128
+
 # any class that implements the SeedProvider interface and has a
 # constructor that takes a Map<String, String> of parameters will do.
 seed_provider:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/commitlog_archiving.properties
----------------------------------------------------------------------
diff --git a/conf/commitlog_archiving.properties b/conf/commitlog_archiving.properties
new file mode 100644
index 0000000..b4dfd79
--- /dev/null
+++ b/conf/commitlog_archiving.properties
@@ -0,0 +1,37 @@
+# commitlog archiving configuration.  Leave blank to disable.
+
+# Command to execute to archive a commitlog segment
+# Parameters: %path => Fully qualified path of the segment to archive
+#             %name => Name of the commit log.
+# Example: archive_command=/bin/ln %path /backup/%name
+#
+# commitlog archiving configuration.  Leave blank to disable.
+
+# Command to execute to archive a commitlog segment
+# Parameters: %path => Fully qualified path of the segment to archive
+#             %name => Name of the commit log.
+# Example: archive_command=/bin/ln %path /backup/%name
+#
+# Limitation: *_command= expects one command with arguments. STDOUT
+# and STDIN or multiple commands cannot be executed.  You might want
+# to script multiple commands and add a pointer here.
+archive_command=
+
+# Command to execute to make an archived commitlog live again.
+# Parameters: %from is the full path to an archived commitlog segment (from restore_directories)
+#             %to is the live commitlog directory
+# Example: restore_command=cp -f %from %to
+restore_command=
+
+# Directory to scan the recovery files in.
+restore_directories=
+
+# Restore mutations created up to and including this timestamp.
+# Format: 2012-04-31 20:43:12
+#
+# Note! Recovery will stop when the first client-supplied timestamp
+# greater than this time is encountered.  Since the order Cassandra
+# receives mutations does not always strictly follow timestamp order,
+# this may leave some mutations with timestamps earlier than the
+# point-in-time unrecovered.
+restore_point_in_time=

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 91b96f1..fe8eb84 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -100,6 +100,7 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
+    public int commitlog_segment_size_in_mb = 128;
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2eb89ba..259d6e8 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -782,6 +782,14 @@ public class DatabaseDescriptor
         return conf.commitlog_directory;
     }
 
+    /**
+     * size of commitlog segments to allocate 
+     */
+    public static int getCommitLogSegmentSize()
+    {
+        return conf.commitlog_segment_size_in_mb * 1024 * 1024;
+    }
+
     public static String getSavedCachesLocation()
     {
         return conf.saved_caches_directory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 69dd0b1..3c34772 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -23,31 +23,17 @@ import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
 
-import com.google.common.collect.Ordering;
-
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.PureJavaCrc32;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -58,9 +44,7 @@ import javax.management.ObjectName;
  */
 public class CommitLog implements CommitLogMBean
 {
-    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
-
-    static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
+    private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
 
     public static final CommitLog instance = new CommitLog();
 
@@ -68,11 +52,11 @@ public class CommitLog implements CommitLogMBean
 
     public final CommitLogAllocator allocator;
 
+    public final CommitLogArchiver archiver = new CommitLogArchiver();
+
     public static final int END_OF_SEGMENT_MARKER = 0;          // this is written out at the end of a segment
     public static final int END_OF_SEGMENT_MARKER_SIZE = 4;     // number of bytes of ^^^
 
-    /** size of commitlog segments to allocate */
-    public static final int SEGMENT_SIZE = 128 * 1024 * 1024;
     public CommitLogSegment activeSegment;
 
     private CommitLog()
@@ -120,6 +104,8 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover() throws IOException
     {
+        archiver.maybeRestoreArchive();
+
         File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter()
         {
             public boolean accept(File dir, String name)
@@ -157,203 +143,19 @@ public class CommitLog implements CommitLogMBean
      * @param clogs   the list of commit log files to replay
      * @return the number of mutations replayed
      */
-    public int recover(File[] clogs) throws IOException
+    public int recover(File... clogs) throws IOException
     {
-        final Set<Table> tablesRecovered = new NonBlockingHashSet<Table>();
-        List<Future<?>> futures = new ArrayList<Future<?>>();
-        byte[] bytes = new byte[4096];
-        Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
-
-        // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
-        final AtomicInteger replayedCount = new AtomicInteger();
-
-        // compute per-CF and global replay positions
-        final Map<Integer, ReplayPosition> cfPositions = new HashMap<Integer, ReplayPosition>();
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
-            // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
-            // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
-            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
-            cfPositions.put(cfs.metadata.cfId, rp);
-        }
-        final ReplayPosition globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
-
-        Checksum checksum = new PureJavaCrc32();
-        for (final File file : clogs)
-        {
-            logger.info("Replaying " + file.getPath());
-
-            final long segment = CommitLogSegment.idFromFilename(file.getName());
-
-            RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
-            assert reader.length() <= Integer.MAX_VALUE;
-
-            try
-            {
-                int replayPosition;
-                if (globalPosition.segment < segment)
-                    replayPosition = 0;
-                else if (globalPosition.segment == segment)
-                    replayPosition = globalPosition.position;
-                else
-                    replayPosition = (int) reader.length();
-
-                if (replayPosition < 0 || replayPosition >= reader.length())
-                {
-                    // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog
-                    // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
-                    logger.debug("skipping replay of fully-flushed {}", file);
-                    continue;
-                }
-
-                reader.seek(replayPosition);
-
-                if (logger.isDebugEnabled())
-                    logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
-
-                /* read the logs populate RowMutation and apply */
-                while (!reader.isEOF())
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Reading mutation at " + reader.getFilePointer());
-
-                    long claimedCRC32;
-                    int serializedSize;
-                    try
-                    {
-                        // any of the reads may hit EOF
-                        serializedSize = reader.readInt();
-                        if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
-                        {
-                            logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
-                            break;
-                        }
-
-                        // RowMutation must be at LEAST 10 bytes:
-                        // 3 each for a non-empty Table and Key (including the 2-byte length from
-                        // writeUTF/writeWithShortLength) and 4 bytes for column count.
-                        // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
-                        if (serializedSize < 10)
-                            break;
-                        long claimedSizeChecksum = reader.readLong();
-                        checksum.reset();
-                        checksum.update(serializedSize);
-                        if (checksum.getValue() != claimedSizeChecksum)
-                            break; // entry wasn't synced correctly/fully.  that's ok.
-
-                        if (serializedSize > bytes.length)
-                            bytes = new byte[(int) (1.2 * serializedSize)];
-                        reader.readFully(bytes, 0, serializedSize);
-                        claimedCRC32 = reader.readLong();
-                    }
-                    catch(EOFException eof)
-                    {
-                        break; // last CL entry didn't get completely written.  that's ok.
-                    }
-
-                    checksum.update(bytes, 0, serializedSize);
-                    if (claimedCRC32 != checksum.getValue())
-                    {
-                        // this entry must not have been fsynced.  probably the rest is bad too,
-                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
-                        continue;
-                    }
-
-                    /* deserialize the commit log entry */
-                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(bytes, 0, serializedSize);
-                    RowMutation rm = null;
-                    try
-                    {
-                        // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
-                        // the current version.  so do make sure the CL is drained prior to upgrading a node.
-                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
-                    }
-                    catch (UnknownColumnFamilyException ex)
-                    {
-                        AtomicInteger i = invalidMutations.get(ex.cfId);
-                        if (i == null)
-                        {
-                            i = new AtomicInteger(1);
-                            invalidMutations.put(ex.cfId, i);
-                        }
-                        else
-                            i.incrementAndGet();
-                        continue;
-                    }
-
-                    if (logger.isDebugEnabled())
-                        logger.debug(String.format("replaying mutation for %s.%s: %s",
-                                                    rm.getTable(),
-                                                    ByteBufferUtil.bytesToHex(rm.key()),
-                                                    "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}"));
-
-                    final long entryLocation = reader.getFilePointer();
-                    final RowMutation frm = rm;
-                    Runnable runnable = new WrappedRunnable()
-                    {
-                        public void runMayThrow() throws IOException
-                        {
-                            if (Schema.instance.getKSMetaData(frm.getTable()) == null)
-                                return;
-                            final Table table = Table.open(frm.getTable());
-                            RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
-
-                            // Rebuild the row mutation, omitting column families that a) have already been flushed,
-                            // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every
-                            // thing based on the cfid instead.
-                            for (ColumnFamily columnFamily : frm.getColumnFamilies())
-                            {
-                                if (Schema.instance.getCF(columnFamily.id()) == null)
-                                    // null means the cf has been dropped
-                                    continue;
-
-                                ReplayPosition rp = cfPositions.get(columnFamily.id());
-
-                                // replay if current segment is newer than last flushed one or, if it is the last known
-                                // segment, if we are after the replay position
-                                if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
-                                {
-                                    newRm.add(columnFamily);
-                                    replayedCount.incrementAndGet();
-                                }
-                            }
-                            if (!newRm.isEmpty())
-                            {
-                                Table.open(newRm.getTable()).apply(newRm, false);
-                                tablesRecovered.add(table);
-                            }
-                        }
-                    };
-                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
-                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                    {
-                        FBUtilities.waitOnFutures(futures);
-                        futures.clear();
-                    }
-                }
-            }
-            finally
-            {
-                FileUtils.closeQuietly(reader);
-                logger.info("Finished reading " + file);
-            }
-        }
-
-        for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
-            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
-
-        // wait for all the writes to finish on the mutation stage
-        FBUtilities.waitOnFutures(futures);
-        logger.debug("Finished waiting on mutations from recovery");
-
-        // flush replayed tables
-        futures.clear();
-        for (Table table : tablesRecovered)
-            futures.addAll(table.flush());
-        FBUtilities.waitOnFutures(futures);
+        CommitLogReplayer recovery = new CommitLogReplayer();
+        recovery.recover(clogs);
+        return recovery.blockForWrites();
+    }
 
-        return replayedCount.get();
+    /**
+     * Perform recovery on a single commit log.
+     */
+    public void recover(String path) throws IOException
+    {
+        recover(new File(path));
     }
 
     /**
@@ -400,7 +202,7 @@ public class CommitLog implements CommitLogMBean
     public void add(RowMutation rm) throws IOException
     {
         long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
-        if (totalSize > CommitLog.SEGMENT_SIZE)
+        if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
         {
             logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
             return;
@@ -538,6 +340,19 @@ public class CommitLog implements CommitLogMBean
         activeSegment = allocator.fetchSegment();
     }
 
+    public List<String> getActiveSegmentNames()
+    {
+        List<String> segmentNames = new ArrayList<String>();
+        for (CommitLogSegment segment : allocator.getActiveSegments())
+            segmentNames.add(segment.getName());
+        return segmentNames;
+    }
+    
+    public List<String> getArchivingSegmentNames()
+    {
+        return new ArrayList<String>(archiver.archivePending.keySet());
+    }
+
     /**
      * Shuts down the threads used by the commit log, blocking until completion.
      */
@@ -565,7 +380,13 @@ public class CommitLog implements CommitLogMBean
             try
             {
                 if (!activeSegment.hasCapacityFor(rowMutation))
+                {
+                    CommitLogSegment oldSegment = activeSegment;
                     activateNextSegment();
+                    // Now we can run the user defined command just before switching to the new commit log.
+                    // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+                    archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
+                }
                 activeSegment.write(rowMutation);
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index da30fe1..963e41e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -145,13 +145,17 @@ public class CommitLogAllocator
     public void recycleSegment(final CommitLogSegment segment)
     {
         activeSegments.remove(segment);
-
+        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+        {
+            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+            discardSegment(segment, false);
+            return;
+        }
         if (isCapExceeded())
         {
-            discardSegment(segment);
+            discardSegment(segment, true);
             return;
         }
-
         queue.add(new Runnable()
         {
             public void run()
@@ -171,7 +175,7 @@ public class CommitLogAllocator
     public void recycleSegment(final File file)
     {
         // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
-        if (isCapExceeded() || file.length() != CommitLog.SEGMENT_SIZE)
+        if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize())
         {
             try
             {
@@ -199,15 +203,15 @@ public class CommitLogAllocator
      *
      * @param segment segment to be discarded
      */
-    private void discardSegment(final CommitLogSegment segment)
+    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
     {
-        size.addAndGet(-CommitLog.SEGMENT_SIZE);
+        size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
 
         queue.add(new Runnable()
         {
             public void run()
             {
-                segment.discard();
+                segment.discard(deleteFile);
             }
         });
     }
@@ -240,7 +244,7 @@ public class CommitLogAllocator
      */
     private CommitLogSegment createFreshSegment()
     {
-        size.addAndGet(CommitLog.SEGMENT_SIZE);
+        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
         return internalAddReadySegment(CommitLogSegment.freshSegment());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
new file mode 100644
index 0000000..a2204c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -0,0 +1,147 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class CommitLogArchiver
+{
+    private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class);
+    public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>();
+    public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("commitlog_archiver");
+    private final String archiveCommand;
+    private final String restoreCommand;
+    private final String restoreDirectories;
+    public final long restorePointInTime;
+
+    public CommitLogArchiver()
+    {
+        Properties commitlog_commands = new Properties();
+        InputStream stream = null;
+        try
+        {
+            stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties");
+
+            if (stream == null)
+            {
+                logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled");
+                archiveCommand = null;
+                restoreCommand = null;
+                restoreDirectories = null;
+                restorePointInTime = Long.MAX_VALUE;
+            }
+            else
+            {
+                commitlog_commands.load(stream);
+                archiveCommand = commitlog_commands.getProperty("archive_command");
+                restoreCommand = commitlog_commands.getProperty("restore_command");
+                restoreDirectories = commitlog_commands.getProperty("restore_directories");
+                String targetTime = commitlog_commands.getProperty("restore_point_in_time");
+                try
+                {
+                    restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : new SimpleDateFormat("yyyy:MM:dd HH:mm:ss").parse(targetTime).getTime();
+                }
+                catch (ParseException e)
+                {
+                    throw new RuntimeException("Unable to parse restore target time", e);
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Unable to load commitlog_archiving.properties", e);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(stream);
+        }
+    }
+
+    public void maybeArchive(final String path, final String name)
+    {
+        if (Strings.isNullOrEmpty(archiveCommand))
+            return;
+
+        archivePending.put(name, executor.submit(new WrappedRunnable()
+        {
+            protected void runMayThrow() throws IOException
+            {
+                String command = archiveCommand.replace("%name", name);
+                command = command.replace("%path", path);
+                exec(command);
+            }
+        }));
+    }
+
+    public boolean maybeWaitForArchiving(String name)
+    {
+        Future<?> f = archivePending.remove(name);
+        if (f == null)
+            return true; // archiving disabled
+
+        try
+        {
+            f.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        catch (ExecutionException e)
+        {
+            if (e.getCause() instanceof IOException)
+            {
+                logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+                return false;
+            }
+            throw new RuntimeException(e);
+        }
+
+        return true;
+    }
+
+    public void maybeRestoreArchive() throws IOException
+    {
+        if (Strings.isNullOrEmpty(restoreDirectories))
+            return;
+
+        for (String dir : restoreDirectories.split(","))
+        {
+            File[] files = new File(dir).listFiles();
+            for (File fromFile : files)
+            {
+                File toFile = new File(DatabaseDescriptor.getCommitLogLocation(),
+                                       CommitLogSegment.FILENAME_PREFIX +
+                                       System.nanoTime() +
+                                       CommitLogSegment.FILENAME_EXTENSION);             
+                String command = restoreCommand.replace("%from", fromFile.getPath());
+                command = command.replace("%to", toFile.getPath());       
+                exec(command);
+            }
+        }
+    }
+
+    private void exec(String command) throws IOException
+    {
+        ProcessBuilder pb = new ProcessBuilder(command.split(" "));
+        pb.redirectErrorStream(true);
+        FBUtilities.exec(pb);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 8f8fa19..29f95a7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.db.commitlog;
 
 
+import java.io.IOException;
+import java.util.List;
 
 public interface CommitLogMBean
 {
@@ -36,4 +38,19 @@ public interface CommitLogMBean
      * Get the current size used by all the commitlog segments.
      */
     public long getTotalCommitlogSize();
+
+    /**
+     * Recover a single file.
+     */
+    public void recover(String path) throws IOException;
+
+    /**
+     * @return file names (not full paths) of active commit log segments (segments containing unflushed data)
+     */
+    public List<String> getActiveSegmentNames();
+    
+    /**
+     * @return Files which are pending for archival attempt.  Does NOT include failed archive attempts.
+     */
+    public List<String> getArchivingSegmentNames();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
new file mode 100644
index 0000000..eb997fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -0,0 +1,269 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Checksum;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.io.IColumnSerializer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Ordering;
+
+public class CommitLogReplayer
+{
+    private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
+    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+
+    private final Set<Table> tablesRecovered;
+    private final List<Future<?>> futures;
+    private final Map<Integer, AtomicInteger> invalidMutations;
+private final AtomicInteger replayedCount;
+    private final Map<Integer, ReplayPosition> cfPositions;
+    private final ReplayPosition globalPosition;
+    private final Checksum checksum;
+    private byte[] buffer;
+
+    public CommitLogReplayer()
+    {
+        this.tablesRecovered = new NonBlockingHashSet<Table>();
+        this.futures = new ArrayList<Future<?>>();
+        this.buffer = new byte[4096];
+        this.invalidMutations = new HashMap<Integer, AtomicInteger>();
+        // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
+        this.replayedCount = new AtomicInteger();
+        // compute per-CF and global replay positions
+        this.cfPositions = new HashMap<Integer, ReplayPosition>();
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
+            // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
+            // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
+            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
+            cfPositions.put(cfs.metadata.cfId, rp);
+        }
+        this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
+        this.checksum = new PureJavaCrc32();
+    }
+
+    public void recover(File[] clogs) throws IOException
+    {
+        for (final File file : clogs)
+            recover(file);
+    }
+
+    public int blockForWrites() throws IOException
+    {
+        for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet())
+            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey()));
+
+        // wait for all the writes to finish on the mutation stage
+        FBUtilities.waitOnFutures(futures);
+        logger.debug("Finished waiting on mutations from recovery");
+
+        // flush replayed tables
+        futures.clear();
+        for (Table table : tablesRecovered)
+            futures.addAll(table.flush());
+        FBUtilities.waitOnFutures(futures);
+        return replayedCount.get();
+    }
+
+    public void recover(File file) throws IOException
+    {
+        logger.info("Replaying " + file.getPath());
+        final long segment = CommitLogSegment.idFromFilename(file.getName());
+        RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
+        assert reader.length() <= Integer.MAX_VALUE;
+        try
+        {
+            int replayPosition;
+            if (globalPosition.segment < segment)
+                replayPosition = 0;
+            else if (globalPosition.segment == segment)
+                replayPosition = globalPosition.position;
+            else
+                replayPosition = (int) reader.length();
+
+            if (replayPosition < 0 || replayPosition >= reader.length())
+            {
+                // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog
+                // (see https://issues.apache.org/jira/browse/CASSANDRA-2285)
+                logger.debug("skipping replay of fully-flushed {}", file);
+                return;
+            }
+
+            reader.seek(replayPosition);
+
+            if (logger.isDebugEnabled())
+                logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
+
+            /* read the logs populate RowMutation and apply */
+            while (!reader.isEOF())
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Reading mutation at " + reader.getFilePointer());
+
+                long claimedCRC32;
+                int serializedSize;
+                try
+                {
+                    // any of the reads may hit EOF
+                    serializedSize = reader.readInt();
+                    if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
+                    {
+                        logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
+                        break;
+                    }
+
+                    // RowMutation must be at LEAST 10 bytes:
+                    // 3 each for a non-empty Table and Key (including the
+                    // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+                    // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+                    if (serializedSize < 10)
+                        break;
+                    long claimedSizeChecksum = reader.readLong();
+                    checksum.reset();
+                    checksum.update(serializedSize);
+                    if (checksum.getValue() != claimedSizeChecksum)
+                        break; // entry wasn't synced correctly/fully. that's
+                               // ok.
+
+                    if (serializedSize > buffer.length)
+                        buffer = new byte[(int) (1.2 * serializedSize)];
+                    reader.readFully(buffer, 0, serializedSize);
+                    claimedCRC32 = reader.readLong();
+                }
+                catch (EOFException eof)
+                {
+                    break; // last CL entry didn't get completely written. that's ok.
+                }
+
+                checksum.update(buffer, 0, serializedSize);
+                if (claimedCRC32 != checksum.getValue())
+                {
+                    // this entry must not have been fsynced. probably the rest is bad too,
+                    // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                    continue;
+                }
+
+                /* deserialize the commit log entry */
+                FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+                RowMutation rm;
+                try
+                {
+                    // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+                    // the current version. so do make sure the CL is drained prior to upgrading a node.
+                    rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
+                }
+                catch (UnknownColumnFamilyException ex)
+                {
+                    AtomicInteger i = invalidMutations.get(ex.cfId);
+                    if (i == null)
+                    {
+                        i = new AtomicInteger(1);
+                        invalidMutations.put(ex.cfId, i);
+                    }
+                    else
+                        i.incrementAndGet();
+                    continue;
+                }
+
+                if (logger.isDebugEnabled())
+                    logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
+                            + "}"));
+
+                final long entryLocation = reader.getFilePointer();
+                final RowMutation frm = rm;
+                Runnable runnable = new WrappedRunnable()
+                {
+                    public void runMayThrow() throws IOException
+                    {
+                        if (Schema.instance.getKSMetaData(frm.getTable()) == null)
+                            return;
+                        if (pointInTimeExceeded(frm))
+                            return;
+
+                        final Table table = Table.open(frm.getTable());
+                        RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
+
+                        // Rebuild the row mutation, omitting column families that 
+                        // a) have already been flushed,
+                        // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                        for (ColumnFamily columnFamily : frm.getColumnFamilies())
+                        {
+                            if (Schema.instance.getCF(columnFamily.id()) == null)
+                                // null means the cf has been dropped
+                                continue;
+
+                            ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+                            // replay if current segment is newer than last flushed one or, 
+                            // if it is the last known segment, if we are after the replay position
+                            if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+                            {
+                                newRm.add(columnFamily);
+                                replayedCount.incrementAndGet();
+                            }
+                        }
+                        if (!newRm.isEmpty())
+                        {
+                            Table.open(newRm.getTable()).apply(newRm, false);
+                            tablesRecovered.add(table);
+                        }
+                    }
+                };
+                futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                {
+                    FBUtilities.waitOnFutures(futures);
+                    futures.clear();
+                }
+            }
+        }
+        finally
+        {
+            FileUtils.closeQuietly(reader);
+            logger.info("Finished reading " + file);
+        }
+    }
+
+    protected boolean pointInTimeExceeded(RowMutation frm)
+    {
+        long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
+
+        for (ColumnFamily families : frm.getColumnFamilies())
+        {
+            if (families.maxTimestamp() > restoreTarget)
+                return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 45408ec..2ee90ad 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -53,9 +53,9 @@ public class CommitLogSegment
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
 
-    private static final String FILENAME_PREFIX = "CommitLog-";
-    private static final String FILENAME_EXTENSION = ".log";
-    private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
+    static final String FILENAME_PREFIX = "CommitLog-";
+    static final String FILENAME_EXTENSION = ".log";
+    private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
 
     // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
     static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
@@ -115,9 +115,9 @@ public class CommitLogSegment
             }
 
             // Map the segment, extending or truncating it to the standard segment size
-            logFileAccessor.setLength(CommitLog.SEGMENT_SIZE);
+            logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
 
-            buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, CommitLog.SEGMENT_SIZE);
+            buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
             buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
             buffer.position(0);
 
@@ -163,12 +163,14 @@ public class CommitLogSegment
     /**
      * Completely discards a segment file by deleting it. (Potentially blocking operation)
      */
-    public void discard()
+    public void discard(boolean deleteFile)
     {
+        // TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling?
         close();
         try
         {
-            FileUtils.deleteWithConfirm(logFile);
+            if (deleteFile)
+                FileUtils.deleteWithConfirm(logFile);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index 18140a2..d970d61 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,18 +18,14 @@
  */
 package org.apache.cassandra.utils;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang.StringUtils;
-
 import com.sun.jna.LastErrorException;
 import com.sun.jna.Native;
 
@@ -188,7 +184,7 @@ public final class CLibrary
         }
         try
         {
-            exec(pb);
+            FBUtilities.exec(pb);
         }
         catch (IOException ex)
         {
@@ -197,33 +193,6 @@ public final class CLibrary
         }
     }
 
-    private static void exec(ProcessBuilder pb) throws IOException
-    {
-        Process p = pb.start();
-        try
-        {
-            int errCode = p.waitFor();
-            if (errCode != 0)
-            {
-                BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
-                BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()));
-                StringBuffer buff = new StringBuffer();
-                String str;
-                while ((str = in.readLine()) != null)
-                    buff.append(str).append(System.getProperty("line.separator"));
-                while ((str = err.readLine()) != null)
-                    buff.append(str).append(System.getProperty("line.separator"));
-                throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") +
-                                      ", command error Code: " + errCode +
-                                      ", command output: "+ buff.toString());
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     public static void trySkipCache(int fd, long offset, int len)
     {
         if (fd < 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 38df00e..c0fe9bd 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -320,7 +321,7 @@ public class FBUtilities
 
     public static String resourceToFile(String filename) throws ConfigurationException
     {
-        ClassLoader loader = PropertyFileSnitch.class.getClassLoader();
+        ClassLoader loader = FBUtilities.class.getClassLoader();
         URL scpurl = loader.getResource(filename);
         if (scpurl == null)
             throw new ConfigurationException("unable to locate " + filename);
@@ -549,6 +550,37 @@ public class FBUtilities
         }
     }
 
+    /**
+     * Starts and waits for the given @param pb to finish.
+     * @throws java.io.IOException on non-zero exit code
+     */
+    public static void exec(ProcessBuilder pb) throws IOException
+    {
+        Process p = pb.start();
+        try
+        {
+            int errCode = p.waitFor();
+            if (errCode != 0)
+            {
+                BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
+                BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+                StringBuilder sb = new StringBuilder();
+                String str;
+                while ((str = in.readLine()) != null)
+                    sb.append(str).append(System.getProperty("line.separator"));
+                while ((str = err.readLine()) != null)
+                    sb.append(str).append(System.getProperty("line.separator"));
+                throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") +
+                                      ", command error Code: " + errCode +
+                                      ", command output: "+ sb.toString());
+            }
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
     private static final class WrappedCloseableIterator<T>
         extends AbstractIterator<T> implements CloseableIterator<T>
     {