You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:39 UTC

[3/5] cassandra git commit: Add Change Data Capture

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 2045c35..2e97fd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -22,34 +22,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
 
-import com.codahale.metrics.Timer;
-
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -66,6 +54,14 @@ public abstract class CommitLogSegment
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
 
     private final static long idBase;
+
+    private CDCState cdcState = CDCState.PERMITTED;
+    public enum CDCState {
+        PERMITTED,
+        FORBIDDEN,
+        CONTAINS
+    }
+
     private final static AtomicInteger nextId = new AtomicInteger(1);
     private static long replayLimitId;
     static
@@ -115,18 +111,20 @@ public abstract class CommitLogSegment
     final FileChannel channel;
     final int fd;
 
+    protected final AbstractCommitLogSegmentManager manager;
+
     ByteBuffer buffer;
     private volatile boolean headerWritten;
 
     final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
-    static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
+    static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
     {
         Configuration config = commitLog.configuration;
-        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
-                                                          : config.useCompression() ? new CompressedSegment(commitLog, onClose)
-                                                                                    : new MemoryMappedSegment(commitLog);
+        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose)
+                                                          : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose)
+                                                                                    : new MemoryMappedSegment(commitLog, manager);
         segment.writeLogHeader();
         return segment;
     }
@@ -151,14 +149,16 @@ public abstract class CommitLogSegment
     /**
      * Constructs a new segment file.
      */
-    CommitLogSegment(CommitLog commitLog)
+    CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
         this.commitLog = commitLog;
+        this.manager = manager;
+
         id = getNextId();
         descriptor = new CommitLogDescriptor(id,
                                              commitLog.configuration.getCompressorClass(),
                                              commitLog.configuration.getEncryptionContext());
-        logFile = new File(commitLog.location, descriptor.fileName());
+        logFile = new File(manager.storageDirectory, descriptor.fileName());
 
         try
         {
@@ -369,22 +369,11 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Completely discards a segment file by deleting it. (Potentially blocking operation)
-     */
-    void discard(boolean deleteFile)
-    {
-        close();
-        if (deleteFile)
-            FileUtils.deleteWithConfirm(logFile);
-        commitLog.allocator.addSize(-onDiskSize());
-    }
-
-    /**
-     * @return the current ReplayPosition for this log segment
+     * @return the current CommitLogPosition for this log segment
      */
-    public ReplayPosition getContext()
+    public CommitLogPosition getCurrentCommitLogPosition()
     {
-        return new ReplayPosition(id, allocatePosition.get());
+        return new CommitLogPosition(id, allocatePosition.get());
     }
 
     /**
@@ -474,13 +463,13 @@ public abstract class CommitLogSegment
      * @param cfId    the column family ID that is now clean
      * @param context the optional clean offset
      */
-    public synchronized void markClean(UUID cfId, ReplayPosition context)
+    public synchronized void markClean(UUID cfId, CommitLogPosition context)
     {
         if (!cfDirty.containsKey(cfId))
             return;
-        if (context.segment == id)
+        if (context.segmentId == id)
             markClean(cfId, context.position);
-        else if (context.segment > id)
+        else if (context.segmentId > id)
             markClean(cfId, Integer.MAX_VALUE);
     }
 
@@ -565,14 +554,14 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Check to see if a certain ReplayPosition is contained by this segment file.
+     * Check to see if a certain CommitLogPosition is contained by this segment file.
      *
-     * @param   context the replay position to be checked
-     * @return  true if the replay position is contained by this segment file.
+     * @param   context the commit log segment position to be checked
+     * @return  true if the commit log segment position is contained by this segment file.
      */
-    public boolean contains(ReplayPosition context)
+    public boolean contains(CommitLogPosition context)
     {
-        return context.segment == id;
+        return context.segmentId == id;
     }
 
     // For debugging, not fast
@@ -610,12 +599,37 @@ public abstract class CommitLogSegment
         }
     }
 
+    public CDCState getCDCState()
+    {
+        return cdcState;
+    }
+
+    /**
+     * Change the current cdcState on this CommitLogSegment. There are some restrictions on state transitions and this
+     * method is idempotent.
+     */
+    public void setCDCState(CDCState newState)
+    {
+        if (newState == cdcState)
+            return;
+
+        // Also synchronized in CDCSizeTracker.processNewSegment and .processDiscardedSegment
+        synchronized(this)
+        {
+            if (cdcState == CDCState.CONTAINS && newState != CDCState.CONTAINS)
+                throw new IllegalArgumentException("Cannot transition from CONTAINS to any other state.");
+
+            if (cdcState == CDCState.FORBIDDEN && newState != CDCState.PERMITTED)
+                throw new IllegalArgumentException("Only transition from FORBIDDEN to PERMITTED is allowed.");
+
+            cdcState = newState;
+        }
+    }
+
     /**
      * A simple class for tracking information about the portion of a segment that has been allocated to a log write.
-     * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be
-     * stack-allocated by escape analysis in CommitLog.add.
      */
-    static class Allocation
+    protected static class Allocation
     {
         private final CommitLogSegment segment;
         private final OpOrder.Group appendOp;
@@ -652,9 +666,9 @@ public abstract class CommitLogSegment
             segment.waitForSync(position, waitingOnCommit);
         }
 
-        public ReplayPosition getReplayPosition()
+        public CommitLogPosition getCommitLogPosition()
         {
-            return new ReplayPosition(segment.id, buffer.limit());
+            return new CommitLogPosition(segment.id, buffer.limit());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
deleted file mode 100644
index 4f1166b..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
-/**
- * Performs eager-creation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogSegmentManager
-{
-    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
-
-    /**
-     * Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
-     */
-    private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
-
-    /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
-    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
-
-    /** Active segments, containing unflushed data */
-    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
-
-    /** The segment we are currently allocating commit log records to */
-    private volatile CommitLogSegment allocatingFrom = null;
-
-    private final WaitQueue hasAvailableSegments = new WaitQueue();
-
-    /**
-     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
-     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
-     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
-     * on the manager thread, which will take a ms or two).
-     */
-    private final AtomicLong size = new AtomicLong();
-
-    /**
-     * New segment creation is initially disabled because we'll typically get some "free" segments
-     * recycled after log replay.
-     */
-    volatile boolean createReserveSegments = false;
-
-    private Thread managerThread;
-    private volatile boolean run = true;
-    private final CommitLog commitLog;
-
-    CommitLogSegmentManager(final CommitLog commitLog)
-    {
-        this.commitLog = commitLog;
-    }
-
-    void start()
-    {
-        // The run loop for the manager thread
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    try
-                    {
-                        Runnable task = segmentManagementTasks.poll();
-                        if (task == null)
-                        {
-                            // if we have no more work to do, check if we should create a new segment
-                            if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
-                            {
-                                logger.trace("No segments in reserve; creating a fresh one");
-                                // TODO : some error handling in case we fail to create a new segment
-                                availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager()));
-                                hasAvailableSegments.signalAll();
-                            }
-
-                            // flush old Cfs if we're full
-                            long unused = unusedCapacity();
-                            if (unused < 0)
-                            {
-                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
-                                long spaceToReclaim = 0;
-                                for (CommitLogSegment segment : activeSegments)
-                                {
-                                    if (segment == allocatingFrom)
-                                        break;
-                                    segmentsToRecycle.add(segment);
-                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
-                                    if (spaceToReclaim + unused >= 0)
-                                        break;
-                                }
-                                flushDataFrom(segmentsToRecycle, false);
-                            }
-
-                            try
-                            {
-                                // wait for new work to be provided
-                                task = segmentManagementTasks.take();
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError();
-                            }
-                        }
-
-                        task.run();
-                    }
-                    catch (Throwable t)
-                    {
-                        JVMStabilityInspector.inspectThrowable(t);
-                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
-                            return;
-                        // sleep some arbitrary period to avoid spamming CL
-                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-                    }
-                }
-            }
-
-            private boolean atSegmentLimit()
-            {
-                return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit();
-            }
-
-        };
-
-        run = true;
-
-        managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
-        managerThread.start();
-    }
-
-    /**
-     * Reserve space in the current segment for the provided mutation or, if there isn't space available,
-     * create a new segment.
-     *
-     * @return the provided Allocation object
-     */
-    public Allocation allocate(Mutation mutation, int size)
-    {
-        CommitLogSegment segment = allocatingFrom();
-
-        Allocation alloc;
-        while ( null == (alloc = segment.allocate(mutation, size)) )
-        {
-            // failed to allocate, so move to a new segment with enough room
-            advanceAllocatingFrom(segment);
-            segment = allocatingFrom;
-        }
-
-        return alloc;
-    }
-
-    // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call
-    CommitLogSegment allocatingFrom()
-    {
-        CommitLogSegment r = allocatingFrom;
-        if (r == null)
-        {
-            advanceAllocatingFrom(null);
-            r = allocatingFrom;
-        }
-        return r;
-    }
-
-    /**
-     * Fetches a new segment from the queue, creating a new one if necessary, and activates it
-     */
-    private void advanceAllocatingFrom(CommitLogSegment old)
-    {
-        while (true)
-        {
-            CommitLogSegment next;
-            synchronized (this)
-            {
-                // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
-                // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
-                if (allocatingFrom != old)
-                    return;
-                next = availableSegments.poll();
-                if (next != null)
-                {
-                    allocatingFrom = next;
-                    activeSegments.add(next);
-                }
-            }
-
-            if (next != null)
-            {
-                if (old != null)
-                {
-                    // Now we can run the user defined command just after switching to the new commit log.
-                    // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                    commitLog.archiver.maybeArchive(old);
-
-                    // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
-                    old.discardUnusedTail();
-                }
-
-                // request that the CL be synced out-of-band, as we've finished a segment
-                commitLog.requestExtraSync();
-                return;
-            }
-
-            // no more segments, so register to receive a signal when not empty
-            WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
-
-            // trigger the management thread; this must occur after registering
-            // the signal to ensure we are woken by any new segment creation
-            wakeManager();
-
-            // check if the queue has already been added to before waiting on the signal, to catch modifications
-            // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
-            if (!availableSegments.isEmpty() || allocatingFrom != old)
-            {
-                signal.cancel();
-                // if we've been beaten, just stop immediately
-                if (allocatingFrom != old)
-                    return;
-                // otherwise try again, as there should be an available segment
-                continue;
-            }
-
-            // can only reach here if the queue hasn't been inserted into
-            // before we registered the signal, as we only remove items from the queue
-            // after updating allocatingFrom. Can safely block until we are signalled
-            // by the allocator that new segments have been published
-            signal.awaitUninterruptibly();
-        }
-    }
-
-    private void wakeManager()
-    {
-        // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
-        segmentManagementTasks.add(Runnables.doNothing());
-    }
-
-    /**
-     * Switch to a new segment, regardless of how much is left in the current one.
-     *
-     * Flushes any dirty CFs for this segment and any older segments, and then recycles
-     * the segments
-     */
-    void forceRecycleAll(Iterable<UUID> droppedCfs)
-    {
-        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
-        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
-        advanceAllocatingFrom(last);
-
-        // wait for the commit log modifications
-        last.waitForModifications();
-
-        // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
-        // on the relevant keyspaces to complete
-        Keyspace.writeOrder.awaitNewBarrier();
-
-        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
-        Future<?> future = flushDataFrom(segmentsToRecycle, true);
-        try
-        {
-            future.get();
-
-            for (CommitLogSegment segment : activeSegments)
-                for (UUID cfId : droppedCfs)
-                    segment.markClean(cfId, segment.getContext());
-
-            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
-            // if the previous active segment was the only one to recycle (since an active segment isn't
-            // necessarily dirty, and we only call dCS after a flush).
-            for (CommitLogSegment segment : activeSegments)
-                if (segment.isUnused())
-                    recycleSegment(segment);
-
-            CommitLogSegment first;
-            if ((first = activeSegments.peek()) != null && first.id <= last.id)
-                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
-        }
-        catch (Throwable t)
-        {
-            // for now just log the error and return false, indicating that we failed
-            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
-        }
-    }
-
-    /**
-     * Indicates that a segment is no longer in use and that it should be recycled.
-     *
-     * @param segment segment that is no longer in use
-     */
-    void recycleSegment(final CommitLogSegment segment)
-    {
-        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
-        if (activeSegments.remove(segment))
-        {
-            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
-            discardSegment(segment, archiveSuccess);
-        }
-        else
-        {
-            logger.warn("segment {} not found in activeSegments queue", segment);
-        }
-    }
-
-    /**
-     * Differs from the above because it can work on any file instead of just existing
-     * commit log segments managed by this manager.
-     *
-     * @param file segment file that is no longer in use.
-     */
-    void recycleSegment(final File file)
-    {
-        // (don't decrease managed size, since this was never a "live" segment)
-        logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
-        FileUtils.deleteWithConfirm(file);
-    }
-
-    /**
-     * Indicates that a segment file should be deleted.
-     *
-     * @param segment segment to be discarded
-     */
-    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
-    {
-        logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
-
-        segmentManagementTasks.add(new Runnable()
-        {
-            public void run()
-            {
-                segment.discard(deleteFile);
-            }
-        });
-    }
-
-    /**
-     * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
-     * @param addedSize
-     */
-    void addSize(long addedSize)
-    {
-        size.addAndGet(addedSize);
-    }
-
-    /**
-     * @return the space (in bytes) used by all segment files.
-     */
-    public long onDiskSize()
-    {
-        return size.get();
-    }
-
-    private long unusedCapacity()
-    {
-        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
-        long currentSize = size.get();
-        logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
-        return total - currentSize;
-    }
-
-    /**
-     * @param name the filename to check
-     * @return true if file is managed by this manager.
-     */
-    public boolean manages(String name)
-    {
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            if (segment.getName().equals(name))
-                return true;
-        return false;
-    }
-
-    /**
-     * Throws a flag that enables the behavior of keeping at least one spare segment
-     * available at all times.
-     */
-    void enableReserveSegmentCreation()
-    {
-        createReserveSegments = true;
-        wakeManager();
-    }
-
-    /**
-     * Force a flush on all CFs that are still dirty in @param segments.
-     *
-     * @return a Future that will finish when all the flushes are complete.
-     */
-    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
-    {
-        if (segments.isEmpty())
-            return Futures.immediateFuture(null);
-        final ReplayPosition maxReplayPosition = segments.get(segments.size() - 1).getContext();
-
-        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
-        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
-
-        for (CommitLogSegment segment : segments)
-        {
-            for (UUID dirtyCFId : segment.getDirtyCFIDs())
-            {
-                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
-                if (pair == null)
-                {
-                    // even though we remove the schema entry before a final flush when dropping a CF,
-                    // it's still possible for a writer to race and finish his append after the flush.
-                    logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
-                    segment.markClean(dirtyCFId, segment.getContext());
-                }
-                else if (!flushes.containsKey(dirtyCFId))
-                {
-                    String keyspace = pair.left;
-                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
-                    // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
-                    // no deadlock possibility since switchLock removal
-                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxReplayPosition));
-                }
-            }
-        }
-
-        return Futures.allAsList(flushes.values());
-    }
-
-    /**
-     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
-     * Only call this after the AbstractCommitLogService is shut down.
-     */
-    public void stopUnsafe(boolean deleteSegments)
-    {
-        logger.trace("CLSM closing and clearing existing commit log segments...");
-        createReserveSegments = false;
-
-        awaitManagementTasksCompletion();
-
-        shutdown();
-        try
-        {
-            awaitTermination();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        synchronized (this)
-        {
-            for (CommitLogSegment segment : activeSegments)
-                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-            activeSegments.clear();
-
-            for (CommitLogSegment segment : availableSegments)
-                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-            availableSegments.clear();
-        }
-
-        allocatingFrom = null;
-
-        segmentManagementTasks.clear();
-
-        size.set(0L);
-
-        logger.trace("CLSM done with closing and clearing existing commit log segments.");
-    }
-
-    // Used by tests only.
-    void awaitManagementTasksCompletion()
-    {
-        while (!segmentManagementTasks.isEmpty())
-            Thread.yield();
-        // The last management task is not yet complete. Wait a while for it.
-        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-        // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
-        // waiting completes correctly.
-    }
-
-    private static void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
-    {
-        try
-        {
-            segment.discard(delete);
-        }
-        catch (AssertionError ignored)
-        {
-            // segment file does not exist
-        }
-    }
-
-    /**
-     * Initiates the shutdown process for the management thread.
-     */
-    public void shutdown()
-    {
-        run = false;
-        wakeManager();
-    }
-
-    /**
-     * Returns when the management thread terminates.
-     */
-    public void awaitTermination() throws InterruptedException
-    {
-        managerThread.join();
-
-        for (CommitLogSegment segment : activeSegments)
-            segment.close();
-
-        for (CommitLogSegment segment : availableSegments)
-            segment.close();
-
-        FileDirectSegment.shutdown();
-    }
-
-    /**
-     * @return a read-only collection of the active commit log segments
-     */
-    @VisibleForTesting
-    public Collection<CommitLogSegment> getActiveSegments()
-    {
-        return Collections.unmodifiableCollection(activeSegments);
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
new file mode 100644
index 0000000..5c6fd3f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.DirectorySizeCalculator;
+
+public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
+{
+    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
+    private final CDCSizeTracker cdcSizeTracker;
+
+    public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory)
+    {
+        super(commitLog, storageDirectory);
+        cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));
+    }
+
+    @Override
+    void start()
+    {
+        super.start();
+        cdcSizeTracker.start();
+    }
+
+    public void discard(CommitLogSegment segment, boolean delete)
+    {
+        segment.close();
+        addSize(-segment.onDiskSize());
+
+        cdcSizeTracker.processDiscardedSegment(segment);
+
+        if (segment.getCDCState() == CDCState.CONTAINS)
+            FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName());
+        else
+        {
+            if (delete)
+                FileUtils.deleteWithConfirm(segment.logFile);
+        }
+    }
+
+    /**
+     * Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor.
+     */
+    public void shutdown()
+    {
+        run = false;
+        cdcSizeTracker.shutdown();
+        wakeManager();
+    }
+
+    /**
+     * Reserve space in the current segment for the provided mutation or, if there isn't space available,
+     * create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations.
+     *
+     * @param mutation Mutation to allocate in segment manager
+     * @param size total size (overhead + serialized) of mutation
+     * @return the created Allocation object
+     * @throws WriteTimeoutException If segment disallows CDC mutations, we throw WTE
+     */
+    @Override
+    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException
+    {
+        CommitLogSegment segment = allocatingFrom();
+        CommitLogSegment.Allocation alloc;
+
+        throwIfForbidden(mutation, segment);
+        while ( null == (alloc = segment.allocate(mutation, size)) )
+        {
+            // Failed to allocate, so move to a new segment with enough room if possible.
+            advanceAllocatingFrom(segment);
+            segment = allocatingFrom;
+
+            throwIfForbidden(mutation, segment);
+        }
+
+        if (mutation.trackedByCDC())
+            segment.setCDCState(CDCState.CONTAINS);
+
+        return alloc;
+    }
+
+    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException
+    {
+        if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN)
+        {
+            cdcSizeTracker.submitOverflowSizeRecalculation();
+            throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1);
+        }
+    }
+
+    /**
+     * Move files to cdc_raw after replay, since recovery will flush to SSTable and these mutations won't be available
+     * in the CL subsystem otherwise.
+     */
+    void handleReplayedSegment(final File file)
+    {
+        logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", file);
+        FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName());
+        cdcSizeTracker.addFlushedSize(file.length());
+    }
+
+    /**
+     * On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently
+     * allocated unflushed CDC segments and the contents of cdc_raw
+     */
+    public CommitLogSegment createSegment()
+    {
+        CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+        cdcSizeTracker.processNewSegment(segment);
+        return segment;
+    }
+
+    /**
+     * Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC
+     * data in them and all segments archived into cdc_raw.
+     *
+     * Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full
+     * directory walk to determine any potential deletions by CDC consumer.
+     *
+     * TODO: linux performs approximately 25% better with the following one-liner instead of this walker:
+     *      Arrays.stream(path.listFiles()).mapToLong(File::length).sum();
+     * However this solution is 375% slower on Windows. Revisit this and split logic to per-OS
+     */
+    private class CDCSizeTracker extends DirectorySizeCalculator
+    {
+        private final RateLimiter rateLimiter = RateLimiter.create(1000 / DatabaseDescriptor.getCDCDiskCheckInterval());
+        private ExecutorService cdcSizeCalculationExecutor;
+        private CommitLogSegmentManagerCDC segmentManager;
+        private AtomicLong unflushedCDCSize = new AtomicLong(0);
+
+        CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
+        {
+            super(path);
+            this.segmentManager = segmentManager;
+        }
+
+        /**
+         * Needed for stop/restart during unit tests
+         */
+        public void start()
+        {
+            cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
+        }
+
+        /**
+         * Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new
+         * segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads.
+         *
+         * This can be reached either from the segment management thread in ABstractCommitLogSegmentManager or from the
+         * size recalculation executor, so we synchronize on this object to reduce the race overlap window available for
+         * size to get off.
+         *
+         * Reference DirectorySizerBench for more information about performance of the directory size recalc.
+         */
+        void processNewSegment(CommitLogSegment segment)
+        {
+            // See synchronization in CommitLogSegment.setCDCState
+            synchronized(segment)
+            {
+                segment.setCDCState(defaultSegmentSize() + totalCDCSizeOnDisk() > allowableCDCBytes()
+                                    ? CDCState.FORBIDDEN
+                                    : CDCState.PERMITTED);
+                if (segment.getCDCState() == CDCState.PERMITTED)
+                    unflushedCDCSize.addAndGet(defaultSegmentSize());
+            }
+
+            // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
+            submitOverflowSizeRecalculation();
+        }
+
+        void processDiscardedSegment(CommitLogSegment segment)
+        {
+            // See synchronization in CommitLogSegment.setCDCState
+            synchronized(segment)
+            {
+                // Add to flushed size before decrementing unflushed so we don't have a window of false generosity
+                if (segment.getCDCState() == CDCState.CONTAINS)
+                    size.addAndGet(segment.onDiskSize());
+                if (segment.getCDCState() != CDCState.FORBIDDEN)
+                    unflushedCDCSize.addAndGet(-defaultSegmentSize());
+            }
+
+            // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
+            submitOverflowSizeRecalculation();
+        }
+
+        private long allowableCDCBytes()
+        {
+            return (long)DatabaseDescriptor.getCDCSpaceInMB() * 1024 * 1024;
+        }
+
+        public void submitOverflowSizeRecalculation()
+        {
+            try
+            {
+                cdcSizeCalculationExecutor.submit(() -> recalculateOverflowSize());
+            }
+            catch (RejectedExecutionException e)
+            {
+                // Do nothing. Means we have one in flight so this req. should be satisfied when it completes.
+            }
+        }
+
+        private void recalculateOverflowSize()
+        {
+            rateLimiter.acquire();
+            calculateSize();
+            CommitLogSegment allocatingFrom = segmentManager.allocatingFrom;
+            if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN)
+                processNewSegment(allocatingFrom);
+        }
+
+        private int defaultSegmentSize()
+        {
+            return DatabaseDescriptor.getCommitLogSegmentSize();
+        }
+
+        private void calculateSize()
+        {
+            try
+            {
+                // Since we don't synchronize around either rebuilding our file list or walking the tree and adding to
+                // size, it's possible we could have changes take place underneath us and end up with a slightly incorrect
+                // view of our flushed size by the time this walking completes. Given that there's a linear growth in
+                // runtime on both rebuildFileList and walkFileTree (about 50% for each one on runtime), and that the
+                // window for this race should be very small, this is an acceptable trade-off since it will be resolved
+                // on the next segment creation / deletion with a subsequent call to submitOverflowSizeRecalculation.
+                rebuildFileList();
+                Files.walkFileTree(path.toPath(), this);
+            }
+            catch (IOException ie)
+            {
+                CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie);
+            }
+        }
+
+        private long addFlushedSize(long toAdd)
+        {
+            return size.addAndGet(toAdd);
+        }
+
+        private long totalCDCSizeOnDisk()
+        {
+            return unflushedCDCSize.get() + size.get();
+        }
+
+        public void shutdown()
+        {
+            cdcSizeCalculationExecutor.shutdown();
+        }
+    }
+
+    /**
+     * Only use for testing / validation that size tracker is working. Not for production use.
+     */
+    @VisibleForTesting
+    public long updateCDCTotalSize()
+    {
+        cdcSizeTracker.submitOverflowSizeRecalculation();
+
+        // Give the update time to run
+        try
+        {
+            Thread.sleep(DatabaseDescriptor.getCDCDiskCheckInterval() + 10);
+        }
+        catch (InterruptedException e) {}
+
+        return cdcSizeTracker.totalCDCSizeOnDisk();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
new file mode 100644
index 0000000..333077c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentManager
+{
+    public CommitLogSegmentManagerStandard(final CommitLog commitLog, String storageDirectory)
+    {
+        super(commitLog, storageDirectory);
+    }
+
+    public void discard(CommitLogSegment segment, boolean delete)
+    {
+        segment.close();
+        if (delete)
+            FileUtils.deleteWithConfirm(segment.logFile);
+        addSize(-segment.onDiskSize());
+    }
+
+    /**
+     * Initiates the shutdown process for the management thread.
+     */
+    public void shutdown()
+    {
+        run = false;
+        wakeManager();
+    }
+
+    /**
+     * Reserve space in the current segment for the provided mutation or, if there isn't space available,
+     * create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom
+     *
+     * @param mutation mutation to allocate space for
+     * @param size total size of mutation (overhead + serialized size)
+     * @return the provided Allocation object
+     */
+    public CommitLogSegment.Allocation allocate(Mutation mutation, int size)
+    {
+        CommitLogSegment segment = allocatingFrom();
+
+        CommitLogSegment.Allocation alloc;
+        while ( null == (alloc = segment.allocate(mutation, size)) )
+        {
+            // failed to allocate, so move to a new segment with enough room
+            advanceAllocatingFrom(segment);
+            segment = allocatingFrom;
+        }
+
+        return alloc;
+    }
+
+    /**
+     * Simply delete untracked segment files w/standard, as it'll be flushed to sstables during recovery
+     *
+     * @param file segment file that is no longer in use.
+     */
+    void handleReplayedSegment(final File file)
+    {
+        // (don't decrease managed size, since this was never a "live" segment)
+        logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
+        FileUtils.deleteWithConfirm(file);
+    }
+
+    public CommitLogSegment createSegment()
+    {
+        return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
new file mode 100644
index 0000000..b547131
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
+import org.apache.cassandra.db.commitlog.CommitLogReadHandler.*;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Read each sync section of a commit log, iteratively.
+ */
+public class CommitLogSegmentReader implements Iterable<CommitLogSegmentReader.SyncSegment>
+{
+    private final CommitLogReadHandler handler;
+    private final CommitLogDescriptor descriptor;
+    private final RandomAccessReader reader;
+    private final Segmenter segmenter;
+    private final boolean tolerateTruncation;
+
+    /**
+     * ending position of the current sync section.
+     */
+    protected int end;
+
+    protected CommitLogSegmentReader(CommitLogReadHandler handler,
+                                     CommitLogDescriptor descriptor,
+                                     RandomAccessReader reader,
+                                     boolean tolerateTruncation)
+    {
+        this.handler = handler;
+        this.descriptor = descriptor;
+        this.reader = reader;
+        this.tolerateTruncation = tolerateTruncation;
+
+        end = (int) reader.getFilePointer();
+        if (descriptor.getEncryptionContext().isEnabled())
+            segmenter = new EncryptedSegmenter(descriptor, reader);
+        else if (descriptor.compression != null)
+            segmenter = new CompressedSegmenter(descriptor, reader);
+        else
+            segmenter = new NoOpSegmenter(reader);
+    }
+
+    public Iterator<SyncSegment> iterator()
+    {
+        return new SegmentIterator();
+    }
+
+    protected class SegmentIterator extends AbstractIterator<CommitLogSegmentReader.SyncSegment>
+    {
+        protected SyncSegment computeNext()
+        {
+            while (true)
+            {
+                try
+                {
+                    final int currentStart = end;
+                    end = readSyncMarker(descriptor, currentStart, reader);
+                    if (end == -1)
+                    {
+                        return endOfData();
+                    }
+                    if (end > reader.length())
+                    {
+                        // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
+                        // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
+                        end = (int) reader.length();
+                    }
+                    return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
+                }
+                catch(CommitLogSegmentReader.SegmentReadException e)
+                {
+                    try
+                    {
+                        handler.handleUnrecoverableError(new CommitLogReadException(
+                                                    e.getMessage(),
+                                                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+                                                    !e.invalidCrc && tolerateTruncation));
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                catch (IOException e)
+                {
+                    try
+                    {
+                        boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
+                        // if no exception is thrown, the while loop will continue
+                        handler.handleUnrecoverableError(new CommitLogReadException(
+                                                    e.getMessage(),
+                                                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
+                                                    tolerateErrorsInSection));
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+            }
+        }
+    }
+
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+    {
+        if (offset > reader.length() - SYNC_MARKER_SIZE)
+        {
+            // There was no room in the segment to write a final header. No data could be present here.
+            return -1;
+        }
+        reader.seek(offset);
+        CRC32 crc = new CRC32();
+        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
+        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
+        updateChecksumInt(crc, (int) reader.getPosition());
+        final int end = reader.readInt();
+        long filecrc = reader.readInt() & 0xffffffffL;
+        if (crc.getValue() != filecrc)
+        {
+            if (end != 0 || filecrc != 0)
+            {
+                String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+                             "The end of segment marker should be zero.", offset, reader.getPath());
+                throw new SegmentReadException(msg, true);
+            }
+            return -1;
+        }
+        else if (end < offset || end > reader.length())
+        {
+            String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
+            throw new SegmentReadException(msg, false);
+        }
+        return end;
+    }
+
+    public static class SegmentReadException extends IOException
+    {
+        public final boolean invalidCrc;
+
+        public SegmentReadException(String msg, boolean invalidCrc)
+        {
+            super(msg);
+            this.invalidCrc = invalidCrc;
+        }
+    }
+
+    public static class SyncSegment
+    {
+        /** the 'buffer' to replay commit log data from */
+        public final FileDataInput input;
+
+        /** offset in file where this section begins. */
+        public final int fileStartPosition;
+
+        /** offset in file where this section ends. */
+        public final int fileEndPosition;
+
+        /** the logical ending position of the buffer */
+        public final int endPosition;
+
+        public final boolean toleratesErrorsInSection;
+
+        public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
+        {
+            this.input = input;
+            this.fileStartPosition = fileStartPosition;
+            this.fileEndPosition = fileEndPosition;
+            this.endPosition = endPosition;
+            this.toleratesErrorsInSection = toleratesErrorsInSection;
+        }
+    }
+
+    /**
+     * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
+     */
+    interface Segmenter
+    {
+        /**
+         * Get the next section of the commit log to replay.
+         *
+         * @param startPosition the position in the file to begin reading at
+         * @param nextSectionStartPosition the file position of the beginning of the next section
+         * @return the buffer and it's logical end position
+         * @throws IOException
+         */
+        SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
+
+        /**
+         * Determine if we tolerate errors in the current segment.
+         */
+        default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
+        {
+            return segmentEndPosition >= fileLength || segmentEndPosition < 0;
+        }
+    }
+
+    static class NoOpSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+
+        public NoOpSegmenter(RandomAccessReader reader)
+        {
+            this.reader = reader;
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
+        {
+            reader.seek(startPosition);
+            return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
+        }
+
+        public boolean tolerateSegmentErrors(int end, long length)
+        {
+            return true;
+        }
+    }
+
+    static class CompressedSegmenter implements Segmenter
+    {
+        private final ICompressor compressor;
+        private final RandomAccessReader reader;
+        private byte[] compressedBuffer;
+        private byte[] uncompressedBuffer;
+        private long nextLogicalStart;
+
+        public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
+        {
+            this(CompressionParams.createCompressor(desc.compression), reader);
+        }
+
+        public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
+        {
+            this.compressor = compressor;
+            this.reader = reader;
+            compressedBuffer = new byte[0];
+            uncompressedBuffer = new byte[0];
+            nextLogicalStart = reader.getFilePointer();
+        }
+
+        public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
+        {
+            reader.seek(startPosition);
+            int uncompressedLength = reader.readInt();
+
+            int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
+            if (compressedLength > compressedBuffer.length)
+                compressedBuffer = new byte[(int) (1.2 * compressedLength)];
+            reader.readFully(compressedBuffer, 0, compressedLength);
+
+            if (uncompressedLength > uncompressedBuffer.length)
+               uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+            int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
+            nextLogicalStart += uncompressedLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+
+    static class EncryptedSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+        private final ICompressor compressor;
+        private final Cipher cipher;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer decryptedBuffer;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer uncompressedBuffer;
+
+        private final ChunkProvider chunkProvider;
+
+        private long currentSegmentEndPosition;
+        private long nextLogicalStart;
+
+        public EncryptedSegmenter(CommitLogDescriptor descriptor, RandomAccessReader reader)
+        {
+            this(reader, descriptor.getEncryptionContext());
+        }
+
+        @VisibleForTesting
+        EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
+        {
+            this.reader = reader;
+            decryptedBuffer = ByteBuffer.allocate(0);
+            compressor = encryptionContext.getCompressor();
+            nextLogicalStart = reader.getFilePointer();
+
+            try
+            {
+                cipher = encryptionContext.getDecryptor();
+            }
+            catch (IOException ioe)
+            {
+                throw new FSReadError(ioe, reader.getPath());
+            }
+
+            chunkProvider = () -> {
+                if (reader.getFilePointer() >= currentSegmentEndPosition)
+                    return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                try
+                {
+                    decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
+                    uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
+                    return uncompressedBuffer;
+                }
+                catch (IOException e)
+                {
+                    throw new FSReadError(e, reader.getPath());
+                }
+            };
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
+        {
+            int totalPlainTextLength = reader.readInt();
+            currentSegmentEndPosition = nextSectionStartPosition - 1;
+
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
+            nextLogicalStart += totalPlainTextLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 684fc2c..e44dfdf 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment
     /**
      * Constructs a new segment file.
      */
-    CompressedSegment(CommitLog commitLog, Runnable onClose)
+    CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
     {
-        super(commitLog, onClose);
+        super(commitLog, manager, onClose);
         this.compressor = commitLog.configuration.getCompressor();
     }
 
@@ -57,7 +57,7 @@ public class CompressedSegment extends FileDirectSegment
 
     ByteBuffer createBuffer(CommitLog commitLog)
     {
-        return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
+        return manager.getBufferPool().createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
     }
 
     @Override
@@ -71,13 +71,13 @@ public class CompressedSegment extends FileDirectSegment
         try
         {
             int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
-            ByteBuffer compressedBuffer = reusableBufferHolder.get();
+            ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer();
             if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
                 compressedBuffer.capacity() < neededBufferSize)
             {
                 FileUtils.clean(compressedBuffer);
                 compressedBuffer = allocate(neededBufferSize);
-                reusableBufferHolder.set(compressedBuffer);
+                manager.getBufferPool().setThreadLocalReusableBuffer(compressedBuffer);
             }
 
             ByteBuffer inputBuffer = buffer.duplicate();
@@ -91,7 +91,7 @@ public class CompressedSegment extends FileDirectSegment
             // Only one thread can be here at a given time.
             // Protected by synchronization on CommitLogSegment.sync().
             writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining());
-            commitLog.allocator.addSize(compressedBuffer.limit());
+            manager.addSize(compressedBuffer.limit());
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();
             lastWrittenPos = channel.position();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index c34a365..e13b20a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
     private final EncryptionContext encryptionContext;
     private final Cipher cipher;
 
-    public EncryptedSegment(CommitLog commitLog, Runnable onClose)
+    public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
     {
-        super(commitLog, onClose);
+        super(commitLog, manager, onClose);
         this.encryptionContext = commitLog.configuration.getEncryptionContext();
 
         try
@@ -90,9 +90,9 @@ public class EncryptedSegment extends FileDirectSegment
 
     ByteBuffer createBuffer(CommitLog commitLog)
     {
-        //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+        // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
         // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
-        return createBuffer(BufferType.ON_HEAP);
+        return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
     }
 
     void write(int startMarker, int nextMarker)
@@ -108,7 +108,7 @@ public class EncryptedSegment extends FileDirectSegment
         {
             ByteBuffer inputBuffer = buffer.duplicate();
             inputBuffer.limit(contentStart + length).position(contentStart);
-            ByteBuffer buffer = reusableBufferHolder.get();
+            ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer();
 
             // save space for the sync marker at the beginning of this section
             final long syncMarkerPosition = lastWrittenPos;
@@ -127,7 +127,7 @@ public class EncryptedSegment extends FileDirectSegment
                 buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
 
                 contentStart += nextBlockSize;
-                commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+                manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
             }
 
             lastWrittenPos = channel.position();
@@ -138,15 +138,15 @@ public class EncryptedSegment extends FileDirectSegment
             writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
             buffer.putInt(SYNC_MARKER_SIZE, length);
             buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
-            commitLog.allocator.addSize(buffer.limit());
+            manager.addSize(buffer.limit());
 
             channel.position(syncMarkerPosition);
             channel.write(buffer);
 
             SyncUtil.force(channel, true);
 
-            if (reusableBufferHolder.get().capacity() < buffer.capacity())
-                reusableBufferHolder.set(buffer);
+            if (manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < buffer.capacity())
+                manager.getBufferPool().setThreadLocalReusableBuffer(buffer);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 50f9efd..d4160e4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -19,15 +19,8 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.FileUtils;
 
 /**
  * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
@@ -35,45 +28,23 @@ import org.apache.cassandra.io.util.FileUtils;
  */
 public abstract class FileDirectSegment extends CommitLogSegment
 {
-    protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
-    {
-        protected ByteBuffer initialValue()
-        {
-            return ByteBuffer.allocate(0);
-        }
-    };
-
-    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
-    /**
-     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
-     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
-     * more, depending on how soon the sync policy stops all writing threads.
-     */
-    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
-    /**
-     * The number of buffers in use
-     */
-    private static AtomicInteger usedBuffers = new AtomicInteger(0);
-
     volatile long lastWrittenPos = 0;
-
     private final Runnable onClose;
 
-    FileDirectSegment(CommitLog commitLog, Runnable onClose)
+    FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
     {
-        super(commitLog);
+        super(commitLog, manager);
         this.onClose = onClose;
     }
 
+    @Override
     void writeLogHeader()
     {
         super.writeLogHeader();
         try
         {
             channel.write((ByteBuffer) buffer.duplicate().flip());
-            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
+            manager.addSize(lastWrittenPos = buffer.position());
         }
         catch (IOException e)
         {
@@ -81,30 +52,12 @@ public abstract class FileDirectSegment extends CommitLogSegment
         }
     }
 
-    ByteBuffer createBuffer(BufferType bufferType)
-    {
-        usedBuffers.incrementAndGet();
-        ByteBuffer buf = bufferPool.poll();
-        if (buf != null)
-        {
-            buf.clear();
-            return buf;
-        }
-
-        return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
-    }
-
     @Override
     protected void internalClose()
     {
-        usedBuffers.decrementAndGet();
-
         try
         {
-            if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
-                bufferPool.add(buffer);
-            else
-                FileUtils.clean(buffer);
+            manager.getBufferPool().releaseBuffer(buffer);
             super.internalClose();
         }
         finally
@@ -112,20 +65,4 @@ public abstract class FileDirectSegment extends CommitLogSegment
             onClose.run();
         }
     }
-
-    static void shutdown()
-    {
-        bufferPool.clear();
-    }
-
-    /**
-     * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool.
-     *
-     * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers
-     * allowed in the pool, <code>false</code> otherwise.
-     */
-    static boolean hasReachedPoolLimit()
-    {
-        return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3fdf886..2bbd12d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -41,9 +41,9 @@ public class MemoryMappedSegment extends CommitLogSegment
      *
      * @param commitLog the commit log it will be used with.
      */
-    MemoryMappedSegment(CommitLog commitLog)
+    MemoryMappedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
-        super(commitLog);
+        super(commitLog, manager);
         // mark the initial sync marker as uninitialised
         int firstSync = buffer.position();
         buffer.putInt(firstSync + 0, 0);
@@ -66,7 +66,7 @@ public class MemoryMappedSegment extends CommitLogSegment
             {
                 throw new FSWriteError(e, logFile);
             }
-            commitLog.allocator.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
+            manager.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
 
             return channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
deleted file mode 100644
index 0b21763..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import com.google.common.collect.Ordering;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-public class ReplayPosition implements Comparable<ReplayPosition>
-{
-    public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer();
-
-    // NONE is used for SSTables that are streamed from other nodes and thus have no relationship
-    // with our local commitlog. The values satisfy the criteria that
-    //  - no real commitlog segment will have the given id
-    //  - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition
-    public static final ReplayPosition NONE = new ReplayPosition(-1, 0);
-
-    public final long segment;
-    public final int position;
-
-    /**
-     * A filter of known safe-to-discard commit log replay positions, based on
-     * the range covered by on disk sstables and those prior to the most recent truncation record
-     */
-    public static class ReplayFilter
-    {
-        final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
-        public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
-        {
-            for (SSTableReader reader : onDisk)
-            {
-                ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
-                ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
-                add(persisted, start, end);
-            }
-            if (truncatedAt != null)
-                add(persisted, ReplayPosition.NONE, truncatedAt);
-        }
-
-        private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
-        {
-            // extend ourselves to cover any ranges we overlap
-            // record directly preceding our end may extend past us, so take the max of our end and its
-            Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
-            if (extend != null && extend.getValue().compareTo(end) > 0)
-                end = extend.getValue();
-
-            // record directly preceding our start may extend into us; if it does, we take it as our start
-            extend = ranges.lowerEntry(start);
-            if (extend != null && extend.getValue().compareTo(start) >= 0)
-                start = extend.getKey();
-
-            ranges.subMap(start, end).clear();
-            ranges.put(start, end);
-        }
-
-        public boolean shouldReplay(ReplayPosition position)
-        {
-            // replay ranges are start exclusive, end inclusive
-            Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
-            return range == null || position.compareTo(range.getValue()) > 0;
-        }
-
-        public boolean isEmpty()
-        {
-            return persisted.isEmpty();
-        }
-    }
-
-    public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
-    {
-        ReplayPosition min = null;
-        for (ReplayFilter map : ranges)
-        {
-            ReplayPosition first = map.persisted.firstEntry().getValue();
-            if (min == null)
-                min = first;
-            else
-                min = Ordering.natural().min(min, first);
-        }
-        if (min == null)
-            return NONE;
-        return min;
-    }
-
-    public ReplayPosition(long segment, int position)
-    {
-        this.segment = segment;
-        assert position >= 0;
-        this.position = position;
-    }
-
-    public int compareTo(ReplayPosition that)
-    {
-        if (this.segment != that.segment)
-            return Long.compare(this.segment, that.segment);
-
-        return Integer.compare(this.position, that.position);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        ReplayPosition that = (ReplayPosition) o;
-
-        if (position != that.position) return false;
-        return segment == that.segment;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = (int) (segment ^ (segment >>> 32));
-        result = 31 * result + position;
-        return result;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "ReplayPosition(" +
-               "segmentId=" + segment +
-               ", position=" + position +
-               ')';
-    }
-
-    public ReplayPosition clone()
-    {
-        return new ReplayPosition(segment, position);
-    }
-
-    public static class ReplayPositionSerializer implements ISerializer<ReplayPosition>
-    {
-        public void serialize(ReplayPosition rp, DataOutputPlus out) throws IOException
-        {
-            out.writeLong(rp.segment);
-            out.writeInt(rp.position);
-        }
-
-        public ReplayPosition deserialize(DataInputPlus in) throws IOException
-        {
-            return new ReplayPosition(in.readLong(), in.readInt());
-        }
-
-        public long serializedSize(ReplayPosition rp)
-        {
-            return TypeSizes.sizeof(rp.segment) + TypeSizes.sizeof(rp.position);
-        }
-    }
-}