You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:40 UTC

[4/5] cassandra git commit: Add Change Data Capture

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 4a660ca..b1f48b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -22,34 +22,35 @@ import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.zip.CRC32;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
@@ -65,19 +66,19 @@ public class CommitLog implements CommitLogMBean
 
     // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
     // empty segments when writing large records
-    private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+    final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
+
+    final public AbstractCommitLogSegmentManager segmentManager;
 
-    public final CommitLogSegmentManager allocator;
     public final CommitLogArchiver archiver;
     final CommitLogMetrics metrics;
     final AbstractCommitLogService executor;
 
     volatile Configuration configuration;
-    final public String location;
 
     private static CommitLog construct()
     {
-        CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
+        CommitLog log = new CommitLog(CommitLogArchiver.construct());
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -92,9 +93,8 @@ public class CommitLog implements CommitLogMBean
     }
 
     @VisibleForTesting
-    CommitLog(String location, CommitLogArchiver archiver)
+    CommitLog(CommitLogArchiver archiver)
     {
-        this.location = location;
         this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
                                                DatabaseDescriptor.getEncryptionContext());
         DatabaseDescriptor.createAllDirectories();
@@ -106,16 +106,17 @@ public class CommitLog implements CommitLogMBean
                 ? new BatchCommitLogService(this)
                 : new PeriodicCommitLogService(this);
 
-        allocator = new CommitLogSegmentManager(this);
-
+        segmentManager = DatabaseDescriptor.isCDCEnabled()
+                         ? new CommitLogSegmentManagerCDC(this, DatabaseDescriptor.getCommitLogLocation())
+                         : new CommitLogSegmentManagerStandard(this, DatabaseDescriptor.getCommitLogLocation());
         // register metrics
-        metrics.attach(executor, allocator);
+        metrics.attach(executor, segmentManager);
     }
 
     CommitLog start()
     {
         executor.start();
-        allocator.start();
+        segmentManager.start();
         return this;
     }
 
@@ -123,11 +124,12 @@ public class CommitLog implements CommitLogMBean
      * Perform recovery on commit logs located in the directory specified by the config file.
      *
      * @return the number of mutations replayed
+     * @throws IOException
      */
-    public int recover() throws IOException
+    public int recoverSegmentsOnDisk() throws IOException
     {
         // If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place.
-        if (allocator.createReserveSegments)
+        if (segmentManager.createReserveSegments)
             return 0;
 
         FilenameFilter unmanagedFilesFilter = new FilenameFilter()
@@ -136,14 +138,13 @@ public class CommitLog implements CommitLogMBean
             {
                 // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
                 // until after recover was finished.  this turns out to be fragile; it is less error-prone to go
-                // ahead and allow writes before recover(), and just skip active segments when we do.
+                // ahead and allow writes before recover, and just skip active segments when we do.
                 return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
             }
         };
 
-        // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
-        File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
-        for (File file : listFiles)
+        // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904
+        for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter))
         {
             archiver.maybeArchive(file.getPath(), file.getName());
             archiver.maybeWaitForArchiving(file.getName());
@@ -152,7 +153,7 @@ public class CommitLog implements CommitLogMBean
         assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
         archiver.maybeRestoreArchive();
 
-        File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
+        File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
         int replayed = 0;
         if (files.length == 0)
         {
@@ -162,14 +163,14 @@ public class CommitLog implements CommitLogMBean
         {
             Arrays.sort(files, new CommitLogSegmentFileComparator());
             logger.info("Replaying {}", StringUtils.join(files, ", "));
-            replayed = recover(files);
+            replayed = recoverFiles(files);
             logger.info("Log replay complete, {} replayed mutations", replayed);
 
             for (File f : files)
-                allocator.recycleSegment(f);
+                segmentManager.handleReplayedSegment(f);
         }
 
-        allocator.enableReserveSegmentCreation();
+        segmentManager.enableReserveSegmentCreation();
         return replayed;
     }
 
@@ -179,30 +180,35 @@ public class CommitLog implements CommitLogMBean
      * @param clogs   the list of commit log files to replay
      * @return the number of mutations replayed
      */
-    public int recover(File... clogs) throws IOException
+    public int recoverFiles(File... clogs) throws IOException
+    {
+        CommitLogReplayer replayer = CommitLogReplayer.construct(this);
+        replayer.replayFiles(clogs);
+        return replayer.blockForWrites();
+    }
+
+    public void recoverPath(String path) throws IOException
     {
-        CommitLogReplayer recovery = CommitLogReplayer.construct(this);
-        recovery.recover(clogs);
-        return recovery.blockForWrites();
+        CommitLogReplayer replayer = CommitLogReplayer.construct(this);
+        replayer.replayPath(new File(path), false);
+        replayer.blockForWrites();
     }
 
     /**
-     * Perform recovery on a single commit log.
+     * Perform recovery on a single commit log. Kept w/sub-optimal name due to coupling w/MBean / JMX
      */
     public void recover(String path) throws IOException
     {
-        CommitLogReplayer recovery = CommitLogReplayer.construct(this);
-        recovery.recover(new File(path), false);
-        recovery.blockForWrites();
+        recoverPath(path);
     }
 
     /**
-     * @return a ReplayPosition which, if {@code >= one} returned from add(), implies add() was started
+     * @return a CommitLogPosition which, if {@code >= one} returned from add(), implies add() was started
      * (but not necessarily finished) prior to this call
      */
-    public ReplayPosition getContext()
+    public CommitLogPosition getCurrentPosition()
     {
-        return allocator.allocatingFrom().getContext();
+        return segmentManager.getCurrentPosition();
     }
 
     /**
@@ -210,7 +216,7 @@ public class CommitLog implements CommitLogMBean
      */
     public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
     {
-        allocator.forceRecycleAll(droppedCfs);
+        segmentManager.forceRecycleAll(droppedCfs);
     }
 
     /**
@@ -218,21 +224,15 @@ public class CommitLog implements CommitLogMBean
      */
     public void forceRecycleAllSegments()
     {
-        allocator.forceRecycleAll(Collections.<UUID>emptyList());
+        segmentManager.forceRecycleAll(Collections.<UUID>emptyList());
     }
 
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments)
+    public void sync(boolean syncAllSegments) throws IOException
     {
-        CommitLogSegment current = allocator.allocatingFrom();
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-        {
-            if (!syncAllSegments && segment.id > current.id)
-                return;
-            segment.sync();
-        }
+        segmentManager.sync(syncAllSegments);
     }
 
     /**
@@ -244,11 +244,12 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * Add a Mutation to the commit log.
+     * Add a Mutation to the commit log. If CDC is enabled, this can fail.
      *
      * @param mutation the Mutation to add to the log
+     * @throws WriteTimeoutException
      */
-    public ReplayPosition add(Mutation mutation)
+    public CommitLogPosition add(Mutation mutation) throws WriteTimeoutException
     {
         assert mutation != null;
 
@@ -262,7 +263,8 @@ public class CommitLog implements CommitLogMBean
                                                              FBUtilities.prettyPrintMemory(MAX_MUTATION_SIZE)));
         }
 
-        Allocation alloc = allocator.allocate(mutation, (int) totalSize);
+        Allocation alloc = segmentManager.allocate(mutation, totalSize);
+
         CRC32 checksum = new CRC32();
         final ByteBuffer buffer = alloc.getBuffer();
         try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
@@ -287,7 +289,7 @@ public class CommitLog implements CommitLogMBean
         }
 
         executor.finishWriteFor(alloc);
-        return alloc.getReplayPosition();
+        return alloc.getCommitLogPosition();
     }
 
     /**
@@ -295,17 +297,17 @@ public class CommitLog implements CommitLogMBean
      * given. Discards any commit log segments that are no longer used.
      *
      * @param cfId    the column family ID that was flushed
-     * @param context the replay position of the flush
+     * @param context the commit log segment position of the flush
      */
-    public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
+    public void discardCompletedSegments(final UUID cfId, final CommitLogPosition context)
     {
         logger.trace("discard completed log segments for {}, table {}", context, cfId);
 
         // Go thru the active segment files, which are ordered oldest to newest, marking the
-        // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+        // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed
         // in the arguments. Any segments that become unused after they are marked clean will be
         // recycled or discarded.
-        for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
+        for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();)
         {
             CommitLogSegment segment = iter.next();
             segment.markClean(cfId, context);
@@ -313,7 +315,7 @@ public class CommitLog implements CommitLogMBean
             if (segment.isUnused())
             {
                 logger.trace("Commit log segment {} is unused", segment);
-                allocator.recycleSegment(segment);
+                segmentManager.recycleSegment(segment);
             }
             else
             {
@@ -361,8 +363,8 @@ public class CommitLog implements CommitLogMBean
     public List<String> getActiveSegmentNames()
     {
         List<String> segmentNames = new ArrayList<>();
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-            segmentNames.add(segment.getName());
+        for (CommitLogSegment seg : segmentManager.getActiveSegments())
+            segmentNames.add(seg.getName());
         return segmentNames;
     }
 
@@ -375,23 +377,23 @@ public class CommitLog implements CommitLogMBean
     public long getActiveContentSize()
     {
         long size = 0;
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-            size += segment.contentSize();
+        for (CommitLogSegment seg : segmentManager.getActiveSegments())
+            size += seg.contentSize();
         return size;
     }
 
     @Override
     public long getActiveOnDiskSize()
     {
-        return allocator.onDiskSize();
+        return segmentManager.onDiskSize();
     }
 
     @Override
     public Map<String, Double> getActiveSegmentCompressionRatios()
     {
         Map<String, Double> segmentRatios = new TreeMap<>();
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-            segmentRatios.put(segment.getName(), 1.0 * segment.onDiskSize() / segment.contentSize());
+        for (CommitLogSegment seg : segmentManager.getActiveSegments())
+            segmentRatios.put(seg.getName(), 1.0 * seg.onDiskSize() / seg.contentSize());
         return segmentRatios;
     }
 
@@ -402,12 +404,12 @@ public class CommitLog implements CommitLogMBean
     {
         executor.shutdown();
         executor.awaitTermination();
-        allocator.shutdown();
-        allocator.awaitTermination();
+        segmentManager.shutdown();
+        segmentManager.awaitTermination();
     }
 
     /**
-     * FOR TESTING PURPOSES. See CommitLogAllocator.
+     * FOR TESTING PURPOSES
      * @return the number of files recovered
      */
     public int resetUnsafe(boolean deleteSegments) throws IOException
@@ -427,7 +429,6 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * FOR TESTING PURPOSES. See CommitLogAllocator.
      */
     public void stopUnsafe(boolean deleteSegments)
     {
@@ -440,20 +441,24 @@ public class CommitLog implements CommitLogMBean
         {
             throw new RuntimeException(e);
         }
-        allocator.stopUnsafe(deleteSegments);
+        segmentManager.stopUnsafe(deleteSegments);
         CommitLogSegment.resetReplayLimit();
+        if (DatabaseDescriptor.isCDCEnabled() && deleteSegments)
+            for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+                FileUtils.deleteWithConfirm(f);
+
     }
 
     /**
-     * FOR TESTING PURPOSES.  See CommitLogAllocator
+     * FOR TESTING PURPOSES
      */
     public int restartUnsafe() throws IOException
     {
-        allocator.start();
+        segmentManager.start();
         executor.restartUnsafe();
         try
         {
-            return recover();
+            return recoverSegmentsOnDisk();
         }
         catch (FSWriteError e)
         {
@@ -468,16 +473,6 @@ public class CommitLog implements CommitLogMBean
         }
     }
 
-    /**
-     * Used by tests.
-     *
-     * @return the number of active segments (segments with unflushed data in them)
-     */
-    public int activeSegments()
-    {
-        return allocator.getActiveSegments().size();
-    }
-
     @VisibleForTesting
     public static boolean handleCommitError(String message, Throwable t)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
new file mode 100644
index 0000000..84054a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Contains a segment id and a position for CommitLogSegment identification.
+ * Used for both replay and general CommitLog file reading.
+ */
+public class CommitLogPosition implements Comparable<CommitLogPosition>
+{
+    public static final CommitLogPositionSerializer serializer = new CommitLogPositionSerializer();
+
+    // NONE is used for SSTables that are streamed from other nodes and thus have no relationship
+    // with our local commitlog. The values satisfy the criteria that
+    //  - no real commitlog segment will have the given id
+    //  - it will sort before any real CommitLogPosition, so it will be effectively ignored by getCommitLogPosition
+    public static final CommitLogPosition NONE = new CommitLogPosition(-1, 0);
+
+    public final long segmentId;
+    public final int position;
+
+    public static final Comparator<CommitLogPosition> comparator = new Comparator<CommitLogPosition>()
+    {
+        public int compare(CommitLogPosition o1, CommitLogPosition o2)
+        {
+            if (o1.segmentId != o2.segmentId)
+            	return Long.compare(o1.segmentId,  o2.segmentId);
+
+            return Integer.compare(o1.position, o2.position);
+        }
+    };
+
+    public CommitLogPosition(long segmentId, int position)
+    {
+        this.segmentId = segmentId;
+        assert position >= 0;
+        this.position = position;
+    }
+
+    public int compareTo(CommitLogPosition other)
+    {
+        return comparator.compare(this, other);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CommitLogPosition that = (CommitLogPosition) o;
+
+        if (position != that.position) return false;
+        return segmentId == that.segmentId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = (int) (segmentId ^ (segmentId >>> 32));
+        result = 31 * result + position;
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CommitLogPosition(" +
+               "segmentId=" + segmentId +
+               ", position=" + position +
+               ')';
+    }
+
+    public CommitLogPosition clone()
+    {
+        return new CommitLogPosition(segmentId, position);
+    }
+
+
+    public static class CommitLogPositionSerializer implements ISerializer<CommitLogPosition>
+    {
+        public void serialize(CommitLogPosition clsp, DataOutputPlus out) throws IOException
+        {
+            out.writeLong(clsp.segmentId);
+            out.writeInt(clsp.position);
+        }
+
+        public CommitLogPosition deserialize(DataInputPlus in) throws IOException
+        {
+            return new CommitLogPosition(in.readLong(), in.readInt());
+        }
+
+        public long serializedSize(CommitLogPosition clsp)
+        {
+            return TypeSizes.sizeof(clsp.segmentId) + TypeSizes.sizeof(clsp.position);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
new file mode 100644
index 0000000..0602147
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.Mutation;
+
+public interface CommitLogReadHandler
+{
+    enum CommitLogReadErrorReason
+    {
+        RECOVERABLE_DESCRIPTOR_ERROR,
+        UNRECOVERABLE_DESCRIPTOR_ERROR,
+        MUTATION_ERROR,
+        UNRECOVERABLE_UNKNOWN_ERROR,
+        EOF
+    }
+
+    class CommitLogReadException extends IOException
+    {
+        public final CommitLogReadErrorReason reason;
+        public final boolean permissible;
+
+        CommitLogReadException(String message, CommitLogReadErrorReason reason, boolean permissible)
+        {
+            super(message);
+            this.reason = reason;
+            this.permissible = permissible;
+        }
+    }
+
+    /**
+     * Handle an error during segment read, signaling whether or not you want the reader to skip the remainder of the
+     * current segment on error.
+     *
+     * @param exception CommitLogReadException w/details on exception state
+     * @return boolean indicating whether to stop reading
+     * @throws IOException In the event the handler wants forceful termination of all processing, throw IOException.
+     */
+    boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException;
+
+    /**
+     * In instances where we cannot recover from a specific error and don't care what the reader thinks
+     *
+     * @param exception CommitLogReadException w/details on exception state
+     * @throws IOException
+     */
+    void handleUnrecoverableError(CommitLogReadException exception) throws IOException;
+
+    /**
+     * Process a deserialized mutation
+     *
+     * @param m deserialized mutation
+     * @param size serialized size of the mutation
+     * @param entryLocation filePointer offset inside the CommitLogSegment for the record
+     * @param desc CommitLogDescriptor for mutation being processed
+     */
+    void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
new file mode 100644
index 0000000..73d70f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CRC32;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+public class CommitLogReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
+
+    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+
+    @VisibleForTesting
+    public static final int ALL_MUTATIONS = -1;
+    private final CRC32 checksum;
+    private final Map<UUID, AtomicInteger> invalidMutations;
+
+    private byte[] buffer;
+
+    public CommitLogReader()
+    {
+        checksum = new CRC32();
+        invalidMutations = new HashMap<>();
+        buffer = new byte[4096];
+    }
+
+    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
+    {
+        return invalidMutations.entrySet();
+    }
+
+    /**
+     * Reads all passed in files with no minimum, no start, and no mutation limit.
+     */
+    public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
+    {
+        readAllFiles(handler, files, CommitLogPosition.NONE);
+    }
+
+    /**
+     * Reads all passed in files with minPosition, no start, and no mutation limit.
+     */
+    public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException
+    {
+        for (int i = 0; i < files.length; i++)
+            readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length);
+    }
+
+    /**
+     * Reads passed in file fully
+     */
+    public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException
+    {
+        readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
+    }
+
+    /**
+     * Reads passed in file fully, up to mutationLimit count
+     */
+    @VisibleForTesting
+    public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException
+    {
+        readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
+    }
+
+    /**
+     * Reads mutations from file, handing them off to handler
+     * @param handler Handler that will take action based on deserialized Mutations
+     * @param file CommitLogSegment file to read
+     * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
+     * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
+     * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found
+     *
+     * @throws IOException
+     */
+    public void readCommitLogSegment(CommitLogReadHandler handler,
+                                     File file,
+                                     CommitLogPosition minPosition,
+                                     int mutationLimit,
+                                     boolean tolerateTruncation) throws IOException
+    {
+        // just transform from the file name (no reading of headers) to determine version
+        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
+        try(ChannelProxy channel = new ChannelProxy(file);
+            RandomAccessReader reader = RandomAccessReader.open(channel))
+        {
+            if (desc.version < CommitLogDescriptor.VERSION_21)
+            {
+                if (!shouldSkipSegmentId(file, desc, minPosition))
+                {
+                    if (minPosition.segmentId == desc.id)
+                        reader.seek(minPosition.position);
+                    ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+                    statusTracker.errorContext = desc.fileName();
+                    readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc);
+                }
+                return;
+            }
+
+            final long segmentIdFromFilename = desc.id;
+            try
+            {
+                // The following call can either throw or legitimately return null. For either case, we need to check
+                // desc outside this block and set it to null in the exception case.
+                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
+            }
+            catch (Exception e)
+            {
+                desc = null;
+            }
+            if (desc == null)
+            {
+                // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor.
+                handler.handleUnrecoverableError(new CommitLogReadException(
+                    String.format("Could not read commit log descriptor in file %s", file),
+                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+                    false));
+                return;
+            }
+
+            if (segmentIdFromFilename != desc.id)
+            {
+                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
+                    "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file),
+                                                                                CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
+                                                                                false)))
+                {
+                    return;
+                }
+            }
+
+            if (shouldSkipSegmentId(file, desc, minPosition))
+                return;
+
+            CommitLogSegmentReader segmentReader;
+            try
+            {
+                segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
+            }
+            catch(Exception e)
+            {
+                handler.handleUnrecoverableError(new CommitLogReadException(
+                    String.format("Unable to create segment reader for commit log file: %s", e),
+                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
+                    tolerateTruncation));
+                return;
+            }
+
+            try
+            {
+                ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
+                for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
+                {
+                    statusTracker.tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
+
+                    // Skip segments that are completely behind the desired minPosition
+                    if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position)
+                        continue;
+
+                    statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+
+                    // TODO: Since EncryptedFileSegmentInputStream doesn't implement seek(), we cannot pre-emptively seek
+                    // to the desired offset in the syncSegment before reading the section and deserializing the mutations.
+                    readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
+                    if (!statusTracker.shouldContinue())
+                        break;
+                }
+            }
+            // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException
+            // is wrapping an IOException.
+            catch (RuntimeException re)
+            {
+                if (re.getCause() instanceof IOException)
+                    throw (IOException) re.getCause();
+                throw re;
+            }
+            logger.debug("Finished reading {}", file);
+        }
+    }
+
+    /**
+     * Any segment with id >= minPosition.segmentId is a candidate for read.
+     */
+    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition)
+    {
+        logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
+            file.getPath(),
+            desc.version,
+            desc.getMessagingVersion(),
+            desc.compression);
+
+        if (minPosition.segmentId > desc.id)
+        {
+            logger.trace("Skipping read of fully-flushed {}", file);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Reads a section of a file containing mutations
+     *
+     * @param handler Handler that will take action based on deserialized Mutations
+     * @param reader FileDataInput / logical buffer containing commitlog mutations
+     * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations
+     * @param end logical numeric end of the segment being read
+     * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc
+     * @param desc Descriptor for CommitLog serialization
+     */
+    private void readSection(CommitLogReadHandler handler,
+                             FileDataInput reader,
+                             CommitLogPosition minPosition,
+                             int end,
+                             ReadStatusTracker statusTracker,
+                             CommitLogDescriptor desc) throws IOException
+    {
+        while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
+        {
+            long mutationStart = reader.getFilePointer();
+            if (logger.isTraceEnabled())
+                logger.trace("Reading mutation at {}", mutationStart);
+
+            long claimedCRC32;
+            int serializedSize;
+            try
+            {
+                // any of the reads may hit EOF
+                serializedSize = reader.readInt();
+                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+                {
+                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
+                    statusTracker.requestTermination();
+                    return;
+                }
+
+                // Mutation must be at LEAST 10 bytes:
+                //    3 for a non-empty Keyspace
+                //    3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
+                //    4 bytes for column count.
+                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+                if (serializedSize < 10)
+                {
+                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+                                                    String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
+                                                    CommitLogReadErrorReason.MUTATION_ERROR,
+                                                    statusTracker.tolerateErrorsInSection)))
+                    {
+                        statusTracker.requestTermination();
+                    }
+                    return;
+                }
+
+                long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
+                checksum.reset();
+                CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
+
+                if (checksum.getValue() != claimedSizeChecksum)
+                {
+                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+                                                    String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+                                                    CommitLogReadErrorReason.MUTATION_ERROR,
+                                                    statusTracker.tolerateErrorsInSection)))
+                    {
+                        statusTracker.requestTermination();
+                    }
+                    return;
+                }
+
+                if (serializedSize > buffer.length)
+                    buffer = new byte[(int) (1.2 * serializedSize)];
+                reader.readFully(buffer, 0, serializedSize);
+
+                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
+            }
+            catch (EOFException eof)
+            {
+                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+                                                String.format("Unexpected end of segment", mutationStart, statusTracker.errorContext),
+                                                CommitLogReadErrorReason.EOF,
+                                                statusTracker.tolerateErrorsInSection)))
+                {
+                    statusTracker.requestTermination();
+                }
+                return;
+            }
+
+            checksum.update(buffer, 0, serializedSize);
+            if (claimedCRC32 != checksum.getValue())
+            {
+                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
+                                                String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
+                                                CommitLogReadErrorReason.MUTATION_ERROR,
+                                                statusTracker.tolerateErrorsInSection)))
+                {
+                    statusTracker.requestTermination();
+                }
+                continue;
+            }
+
+            long mutationPosition = reader.getFilePointer();
+            readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc);
+
+            // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that
+            // are before this mark.
+            if (mutationPosition >= minPosition.position)
+                statusTracker.addProcessedMutation();
+        }
+    }
+
+    /**
+     * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
+     *
+     * @param handler Handler that will take action based on deserialized Mutations
+     * @param inputBuffer raw byte array w/Mutation data
+     * @param size deserialized size of mutation
+     * @param minPosition We need to suppress replay of mutations that are before the required minPosition
+     * @param entryLocation filePointer offset of mutation within CommitLogSegment
+     * @param desc CommitLogDescriptor being worked on
+     */
+    @VisibleForTesting
+    protected void readMutation(CommitLogReadHandler handler,
+                                byte[] inputBuffer,
+                                int size,
+                                CommitLogPosition minPosition,
+                                final int entryLocation,
+                                final CommitLogDescriptor desc) throws IOException
+    {
+        // For now, we need to go through the motions of deserializing the mutation to determine its size and move
+        // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment.
+        boolean shouldReplay = entryLocation > minPosition.position;
+
+        final Mutation mutation;
+        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+        {
+            mutation = Mutation.serializer.deserialize(bufIn,
+                                                       desc.getMessagingVersion(),
+                                                       SerializationHelper.Flag.LOCAL);
+            // doublecheck that what we read is still] valid for the current schema
+            for (PartitionUpdate upd : mutation.getPartitionUpdates())
+                upd.validate();
+        }
+        catch (UnknownColumnFamilyException ex)
+        {
+            if (ex.cfId == null)
+                return;
+            AtomicInteger i = invalidMutations.get(ex.cfId);
+            if (i == null)
+            {
+                i = new AtomicInteger(1);
+                invalidMutations.put(ex.cfId, i);
+            }
+            else
+                i.incrementAndGet();
+            return;
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            File f = File.createTempFile("mutation", "dat");
+
+            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+            {
+                out.write(inputBuffer, 0, size);
+            }
+
+            // Checksum passed so this error can't be permissible.
+            handler.handleUnrecoverableError(new CommitLogReadException(
+                String.format(
+                    "Unexpected error deserializing mutation; saved to %s.  " +
+                    "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
+                    "Exception follows: %s", f.getAbsolutePath(), t),
+                CommitLogReadErrorReason.MUTATION_ERROR,
+                false));
+            return;
+        }
+
+        if (logger.isTraceEnabled())
+            logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
+                         "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
+
+        if (shouldReplay)
+            handler.handleMutation(mutation, size, entryLocation, desc);
+    }
+
+    /**
+     * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
+     */
+    private static class CommitLogFormat
+    {
+        public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException
+        {
+            switch (commitLogVersion)
+            {
+                case CommitLogDescriptor.VERSION_12:
+                case CommitLogDescriptor.VERSION_20:
+                    return input.readLong();
+                // Changed format in 2.1
+                default:
+                    return input.readInt() & 0xffffffffL;
+            }
+        }
+
+        public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
+        {
+            switch (commitLogVersion)
+            {
+                case CommitLogDescriptor.VERSION_12:
+                    checksum.update(serializedSize);
+                    break;
+                // Changed format in 2.0
+                default:
+                    updateChecksumInt(checksum, serializedSize);
+                    break;
+            }
+        }
+
+        public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException
+        {
+            switch (commitLogVersion)
+            {
+                case CommitLogDescriptor.VERSION_12:
+                case CommitLogDescriptor.VERSION_20:
+                    return input.readLong();
+                // Changed format in 2.1
+                default:
+                    return input.readInt() & 0xffffffffL;
+            }
+        }
+    }
+
+    private class ReadStatusTracker
+    {
+        private int mutationsLeft;
+        public String errorContext = "";
+        public boolean tolerateErrorsInSection;
+        private boolean error;
+
+        public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
+        {
+            this.mutationsLeft = mutationLimit;
+            this.tolerateErrorsInSection = tolerateErrorsInSection;
+        }
+
+        public void addProcessedMutation()
+        {
+            if (mutationsLeft == ALL_MUTATIONS)
+                return;
+            --mutationsLeft;
+        }
+
+        public boolean shouldContinue()
+        {
+            return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
+        }
+
+        public void requestTermination()
+        {
+            error = true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 93c6c50..c8e597f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -18,32 +18,17 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.CRC32;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -53,28 +38,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
-
-public class CommitLogReplayer
+public class CommitLogReplayer implements CommitLogReadHandler
 {
     @VisibleForTesting
     public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64);
@@ -83,117 +54,54 @@ public class CommitLogReplayer
     static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
-    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 
-    private final Set<Keyspace> keyspacesRecovered;
+    private final Set<Keyspace> keyspacesReplayed;
     private final Queue<Future<Integer>> futures;
-    private final Map<UUID, AtomicInteger> invalidMutations;
+
     private final AtomicInteger replayedCount;
-    private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
-    private final ReplayPosition globalPosition;
-    private final CRC32 checksum;
-    private byte[] buffer;
+    private final Map<UUID, ReplayPositionFilter> cfPersisted;
+    private final CommitLogPosition globalPosition;
+
+    // Used to throttle speed of replay of mutations if we pass the max outstanding count
     private long pendingMutationBytes = 0;
 
     private final ReplayFilter replayFilter;
     private final CommitLogArchiver archiver;
 
-    /*
-     * Wrapper around initiating mutations read from the log to make it possible
-     * to spy on initiated mutations for test
-     */
     @VisibleForTesting
-    public static class MutationInitiator
-    {
-        protected Future<Integer> initiateMutation(final Mutation mutation,
-                                                   final long segmentId,
-                                                   final int serializedSize,
-                                                   final int entryLocation,
-                                                   final CommitLogReplayer clr)
-        {
-            Runnable runnable = new WrappedRunnable()
-            {
-                public void runMayThrow()
-                {
-                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
-                        return;
-                    if (clr.pointInTimeExceeded(mutation))
-                        return;
-
-                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+    protected CommitLogReader commitLogReader;
 
-                    // Rebuild the mutation, omitting column families that
-                    //    a) the user has requested that we ignore,
-                    //    b) have already been flushed,
-                    // or c) are part of a cf that was dropped.
-                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                    Mutation newMutation = null;
-                    for (PartitionUpdate update : clr.replayFilter.filter(mutation))
-                    {
-                        if (Schema.instance.getCF(update.metadata().cfId) == null)
-                            continue; // dropped
-
-                        // replay if current segment is newer than last flushed one or,
-                        // if it is the last known segment, if we are after the replay position
-                        if (clr.shouldReplay(update.metadata().cfId, new ReplayPosition(segmentId, entryLocation)))
-                        {
-                            if (newMutation == null)
-                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-                            newMutation.add(update);
-                            clr.replayedCount.incrementAndGet();
-                        }
-                    }
-                    if (newMutation != null)
-                    {
-                        assert !newMutation.isEmpty();
-
-                        try
-                        {
-                            Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                        }
-                        catch (ExecutionException e)
-                        {
-                            throw Throwables.propagate(e.getCause());
-                        }
-
-                        clr.keyspacesRecovered.add(keyspace);
-                    }
-                }
-            };
-            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
-        }
-    }
-
-    CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
+    CommitLogReplayer(CommitLog commitLog,
+                      CommitLogPosition globalPosition,
+                      Map<UUID, ReplayPositionFilter> cfPersisted,
+                      ReplayFilter replayFilter)
     {
-        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
+        this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayDeque<Future<Integer>>();
-        this.buffer = new byte[4096];
-        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
-        this.checksum = new CRC32();
         this.cfPersisted = cfPersisted;
         this.globalPosition = globalPosition;
         this.replayFilter = replayFilter;
         this.archiver = commitLog.archiver;
+        this.commitLogReader = new CommitLogReader();
     }
 
     public static CommitLogReplayer construct(CommitLog commitLog)
     {
-        // compute per-CF and global replay positions
-        Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
+        // compute per-CF and global commit log segment positions
+        Map<UUID, ReplayPositionFilter> cfPersisted = new HashMap<>();
         ReplayFilter replayFilter = ReplayFilter.create();
-        ReplayPosition globalPosition = null;
+        CommitLogPosition globalPosition = null;
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
             // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
-            ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
+            CommitLogPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
             if (truncatedAt != null)
             {
-                // Point in time restore is taken to mean that the tables need to be recovered even if they were
+                // Point in time restore is taken to mean that the tables need to be replayed even if they were
                 // deleted at a later point in time. Any truncation record after that point must thus be cleared prior
-                // to recovery (CASSANDRA-9195).
+                // to replay (CASSANDRA-9195).
                 long restoreTime = commitLog.archiver.restorePointInTime;
                 long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
                 if (truncatedTime > restoreTime)
@@ -209,28 +117,35 @@ public class CommitLogReplayer
                 }
             }
 
-            ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
+            ReplayPositionFilter filter = new ReplayPositionFilter(cfs.getSSTables(), truncatedAt);
             if (!filter.isEmpty())
                 cfPersisted.put(cfs.metadata.cfId, filter);
             else
-                globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
+                globalPosition = CommitLogPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
         }
         if (globalPosition == null)
-            globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
-        logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
+            globalPosition = firstNotCovered(cfPersisted.values());
+        logger.trace("Global commit log segment position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
         return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
     }
 
-    public void recover(File[] clogs) throws IOException
+    public void replayPath(File file, boolean tolerateTruncation) throws IOException
+    {
+        commitLogReader.readCommitLogSegment(this, file, globalPosition, CommitLogReader.ALL_MUTATIONS, tolerateTruncation);
+    }
+
+    public void replayFiles(File[] clogs) throws IOException
     {
-        int i;
-        for (i = 0; i < clogs.length; ++i)
-            recover(clogs[i], i + 1 == clogs.length);
+        commitLogReader.readAllFiles(this, clogs, globalPosition);
     }
 
+    /**
+     * Flushes all keyspaces associated with this replayer in parallel, blocking until their flushes are complete.
+     * @return the number of mutations replayed
+     */
     public int blockForWrites()
     {
-        for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
+        for (Map.Entry<UUID, AtomicInteger> entry : commitLogReader.getInvalidMutations())
             logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
 
         // wait for all the writes to finish on the mutation stage
@@ -242,7 +157,7 @@ public class CommitLogReplayer
         boolean flushingSystem = false;
 
         List<Future<?>> futures = new ArrayList<Future<?>>();
-        for (Keyspace keyspace : keyspacesRecovered)
+        for (Keyspace keyspace : keyspacesReplayed)
         {
             if (keyspace.getName().equals(SystemKeyspace.NAME))
                 flushingSystem = true;
@@ -259,6 +174,137 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
+    /*
+     * Wrapper around initiating mutations read from the log to make it possible
+     * to spy on initiated mutations for test
+     */
+    @VisibleForTesting
+    public static class MutationInitiator
+    {
+        protected Future<Integer> initiateMutation(final Mutation mutation,
+                                                   final long segmentId,
+                                                   final int serializedSize,
+                                                   final int entryLocation,
+                                                   final CommitLogReplayer commitLogReplayer)
+        {
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow()
+                {
+                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                        return;
+                    if (commitLogReplayer.pointInTimeExceeded(mutation))
+                        return;
+
+                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+                    // Rebuild the mutation, omitting column families that
+                    //    a) the user has requested that we ignore,
+                    //    b) have already been flushed,
+                    // or c) are part of a cf that was dropped.
+                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                    Mutation newMutation = null;
+                    for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
+                    {
+                        if (Schema.instance.getCF(update.metadata().cfId) == null)
+                            continue; // dropped
+
+                        // replay if current segment is newer than last flushed one or,
+                        // if it is the last known segment, if we are after the commit log segment position
+                        if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
+                        {
+                            if (newMutation == null)
+                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+                            newMutation.add(update);
+                            commitLogReplayer.replayedCount.incrementAndGet();
+                        }
+                    }
+                    if (newMutation != null)
+                    {
+                        assert !newMutation.isEmpty();
+
+                        try
+                        {
+                            Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
+                        }
+                        catch (ExecutionException e)
+                        {
+                            throw Throwables.propagate(e.getCause());
+                        }
+
+                        commitLogReplayer.keyspacesReplayed.add(keyspace);
+                    }
+                }
+            };
+            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
+        }
+    }
+
+    /**
+     * A filter of known safe-to-discard commit log replay positions, based on
+     * the range covered by on disk sstables and those prior to the most recent truncation record
+     */
+    public static class ReplayPositionFilter
+    {
+        final NavigableMap<CommitLogPosition, CommitLogPosition> persisted = new TreeMap<>();
+        public ReplayPositionFilter(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
+        {
+            for (SSTableReader reader : onDisk)
+            {
+                CommitLogPosition start = reader.getSSTableMetadata().commitLogLowerBound;
+                CommitLogPosition end = reader.getSSTableMetadata().commitLogUpperBound;
+                add(persisted, start, end);
+            }
+            if (truncatedAt != null)
+                add(persisted, CommitLogPosition.NONE, truncatedAt);
+        }
+
+        private static void add(NavigableMap<CommitLogPosition, CommitLogPosition> ranges, CommitLogPosition start, CommitLogPosition end)
+        {
+            // extend ourselves to cover any ranges we overlap
+            // record directly preceding our end may extend past us, so take the max of our end and its
+            Map.Entry<CommitLogPosition, CommitLogPosition> extend = ranges.floorEntry(end);
+            if (extend != null && extend.getValue().compareTo(end) > 0)
+                end = extend.getValue();
+
+            // record directly preceding our start may extend into us; if it does, we take it as our start
+            extend = ranges.lowerEntry(start);
+            if (extend != null && extend.getValue().compareTo(start) >= 0)
+                start = extend.getKey();
+
+            ranges.subMap(start, end).clear();
+            ranges.put(start, end);
+        }
+
+        public boolean shouldReplay(CommitLogPosition position)
+        {
+            // replay ranges are start exclusive, end inclusive
+            Map.Entry<CommitLogPosition, CommitLogPosition> range = persisted.lowerEntry(position);
+            return range == null || position.compareTo(range.getValue()) > 0;
+        }
+
+        public boolean isEmpty()
+        {
+            return persisted.isEmpty();
+        }
+    }
+
+    public static CommitLogPosition firstNotCovered(Iterable<ReplayPositionFilter> ranges)
+    {
+        CommitLogPosition min = null;
+        for (ReplayPositionFilter map : ranges)
+        {
+            CommitLogPosition first = map.persisted.firstEntry().getValue();
+            if (min == null)
+                min = first;
+            else
+                min = Ordering.natural().min(min, first);
+        }
+        if (min == null)
+            return CommitLogPosition.NONE;
+        return min;
+    }
+
     abstract static class ReplayFilter
     {
         public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
@@ -340,291 +386,65 @@ public class CommitLogReplayer
      *
      * @return true iff replay is necessary
      */
-    private boolean shouldReplay(UUID cfId, ReplayPosition position)
+    private boolean shouldReplay(UUID cfId, CommitLogPosition position)
     {
-        ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+        ReplayPositionFilter filter = cfPersisted.get(cfId);
         return filter == null || filter.shouldReplay(position);
     }
 
-    @SuppressWarnings("resource")
-    public void recover(File file, boolean tolerateTruncation) throws IOException
-    {
-        // just transform from the file name (no reading of headers) to determine version
-        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-
-        try(ChannelProxy channel = new ChannelProxy(file);
-            RandomAccessReader reader = RandomAccessReader.open(channel))
-        {
-            if (desc.version < CommitLogDescriptor.VERSION_21)
-            {
-                if (logAndCheckIfShouldSkip(file, desc))
-                    return;
-                if (globalPosition.segment == desc.id)
-                    reader.seek(globalPosition.position);
-                replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
-                return;
-            }
-            final long segmentId = desc.id;
-            try
-            {
-                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
-            }
-            catch (Exception e)
-            {
-                desc = null;
-            }
-
-            if (desc == null) {
-                handleReplayError(false, "Could not read commit log descriptor in file %s", file);
-                return;
-            }
-            if (segmentId != desc.id)
-            {
-                handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
-                // continue processing if ignored.
-            }
-
-            if (logAndCheckIfShouldSkip(file, desc))
-                return;
-
-            SegmentReader segmentReader;
-            try
-            {
-                segmentReader = new SegmentReader(desc, reader, tolerateTruncation);
-            }
-            catch(Exception e)
-            {
-                handleReplayError(false, "unable to create segment reader for commit log file: %s", e);
-                return;
-            }
-
-            try
-            {
-                boolean tolerateErrorsInSection = tolerateTruncation;
-                for (SyncSegment syncSegment : segmentReader)
-                {
-                    tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
-
-                    // Skip over flushed section.
-                    if (desc.id == globalPosition.segment && syncSegment.endPosition < globalPosition.position)
-                        continue;
-                    String errorContext = String.format("next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
-                    if (!replaySyncSection(syncSegment.input, syncSegment.endPosition, desc, errorContext, tolerateErrorsInSection))
-                        break;
-                }
-            }
-            // unfortunately, AbstractIterator cannot throw a checked excpetion,
-            // so check to see if a RuntimeException is wrapping an IOException
-            catch (RuntimeException re)
-            {
-                if (re.getCause() instanceof IOException)
-                    throw (IOException) re.getCause();
-                throw re;
-            }
-            logger.debug("Finished reading {}", file);
-        }
-    }
-
-    public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
+    protected boolean pointInTimeExceeded(Mutation fm)
     {
-        logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})",
-                    file.getPath(),
-                    desc.version,
-                    desc.getMessagingVersion(),
-                    desc.compression);
+        long restoreTarget = archiver.restorePointInTime;
 
-        if (globalPosition.segment > desc.id)
+        for (PartitionUpdate upd : fm.getPartitionUpdates())
         {
-            logger.trace("skipping replay of fully-flushed {}", file);
-            return true;
+            if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
+                return true;
         }
         return false;
     }
 
-    /**
-     * Replays a sync section containing a list of mutations.
-     *
-     * @return Whether replay should continue with the next section.
-     */
-    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
+    public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
     {
-         /* read the logs populate Mutation and apply */
-        while (reader.getFilePointer() < end && !reader.isEOF())
-        {
-            long mutationStart = reader.getFilePointer();
-            if (logger.isTraceEnabled())
-                logger.trace("Reading mutation at {}", mutationStart);
-
-            long claimedCRC32;
-            int serializedSize;
-            try
-            {
-                // any of the reads may hit EOF
-                serializedSize = reader.readInt();
-                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
-                {
-                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
-                    return false;
-                }
-
-                // Mutation must be at LEAST 10 bytes:
-                // 3 each for a non-empty Keyspace and Key (including the
-                // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
-                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
-                if (serializedSize < 10)
-                {
-                    handleReplayError(tolerateErrors,
-                                      "Invalid mutation size %d at %d in %s",
-                                      serializedSize, mutationStart, errorContext);
-                    return false;
-                }
-
-                long claimedSizeChecksum;
-                if (desc.version < CommitLogDescriptor.VERSION_21)
-                    claimedSizeChecksum = reader.readLong();
-                else
-                    claimedSizeChecksum = reader.readInt() & 0xffffffffL;
-                checksum.reset();
-                if (desc.version < CommitLogDescriptor.VERSION_20)
-                    checksum.update(serializedSize);
-                else
-                    updateChecksumInt(checksum, serializedSize);
-
-                if (checksum.getValue() != claimedSizeChecksum)
-                {
-                    handleReplayError(tolerateErrors,
-                                      "Mutation size checksum failure at %d in %s",
-                                      mutationStart, errorContext);
-                    return false;
-                }
-                // ok.
-
-                if (serializedSize > buffer.length)
-                    buffer = new byte[(int) (1.2 * serializedSize)];
-                reader.readFully(buffer, 0, serializedSize);
-                if (desc.version < CommitLogDescriptor.VERSION_21)
-                    claimedCRC32 = reader.readLong();
-                else
-                    claimedCRC32 = reader.readInt() & 0xffffffffL;
-            }
-            catch (EOFException eof)
-            {
-                handleReplayError(tolerateErrors,
-                                  "Unexpected end of segment",
-                                  mutationStart, errorContext);
-                return false; // last CL entry didn't get completely written. that's ok.
-            }
-
-            checksum.update(buffer, 0, serializedSize);
-            if (claimedCRC32 != checksum.getValue())
-            {
-                handleReplayError(tolerateErrors,
-                                  "Mutation checksum failure at %d in %s",
-                                  mutationStart, errorContext);
-                continue;
-            }
-            replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
-        }
-        return true;
-    }
-
-    /**
-     * Deserializes and replays a commit log entry.
-     */
-    void replayMutation(byte[] inputBuffer, int size,
-            final int entryLocation, final CommitLogDescriptor desc) throws IOException
-    {
-
-        final Mutation mutation;
-        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
-        {
-            mutation = Mutation.serializer.deserialize(bufIn,
-                                                       desc.getMessagingVersion(),
-                                                       SerializationHelper.Flag.LOCAL);
-            // doublecheck that what we read is [still] valid for the current schema
-            for (PartitionUpdate upd : mutation.getPartitionUpdates())
-                upd.validate();
-        }
-        catch (UnknownColumnFamilyException ex)
-        {
-            if (ex.cfId == null)
-                return;
-            AtomicInteger i = invalidMutations.get(ex.cfId);
-            if (i == null)
-            {
-                i = new AtomicInteger(1);
-                invalidMutations.put(ex.cfId, i);
-            }
-            else
-                i.incrementAndGet();
-            return;
-        }
-        catch (Throwable t)
-        {
-            JVMStabilityInspector.inspectThrowable(t);
-            File f = File.createTempFile("mutation", "dat");
-
-            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
-            {
-                out.write(inputBuffer, 0, size);
-            }
-
-            // Checksum passed so this error can't be permissible.
-            handleReplayError(false,
-                              "Unexpected error deserializing mutation; saved to %s.  " +
-                              "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
-                              "Exception follows: %s",
-                              f.getAbsolutePath(),
-                              t);
-            return;
-        }
-
-        if (logger.isTraceEnabled())
-            logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
-
         pendingMutationBytes += size;
-        futures.offer(mutationInitiator.initiateMutation(mutation,
+        futures.offer(mutationInitiator.initiateMutation(m,
                                                          desc.id,
                                                          size,
                                                          entryLocation,
                                                          this));
-        //If there are finished mutations, or too many outstanding bytes/mutations
-        //drain the futures in the queue
+        // If there are finished mutations, or too many outstanding bytes/mutations
+        // drain the futures in the queue
         while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT
-                || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
-                || (!futures.isEmpty() && futures.peek().isDone()))
+               || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
+               || (!futures.isEmpty() && futures.peek().isDone()))
         {
             pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll());
         }
     }
 
-    protected boolean pointInTimeExceeded(Mutation fm)
-    {
-        long restoreTarget = archiver.restorePointInTime;
-
-        for (PartitionUpdate upd : fm.getPartitionUpdates())
-        {
-            if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget)
-                return true;
-        }
-        return false;
-    }
-
-    static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
+    public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException
     {
-        String msg = String.format(message, messageArgs);
-        IOException e = new CommitLogReplayException(msg);
-        if (permissible)
-            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
+        if (exception.permissible)
+            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", exception);
         else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
-            logger.error("Ignoring commit log replay error", e);
-        else if (!CommitLog.handleCommitError("Failed commit log replay", e))
+            logger.error("Ignoring commit log replay error", exception);
+        else if (!CommitLog.handleCommitError("Failed commit log replay", exception))
         {
             logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
                          "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
                          "on the command line");
-            throw e;
+            throw new CommitLogReplayException(exception.getMessage(), exception);
         }
+        return false;
+    }
+
+    /**
+     * The logic for whether or not we throw on an error is identical for the replayer between recoverable or non.
+     */
+    public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
+    {
+        // Don't care about return value, use this simply to throw exception as appropriate.
+        shouldSkipSegmentOnError(exception);
     }
 
     @SuppressWarnings("serial")