You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/02 20:09:24 UTC

[1/2] Multithreaded commitlog patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578

Updated Branches:
  refs/heads/trunk 679ec7e88 -> 22e18f5a3


http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 2c7a848..bc5c7d1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -17,19 +17,28 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Checksum;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +48,9 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.ByteBufferOutputStream;
-import org.apache.cassandra.io.util.ChecksummedOutputStream;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.PureJavaCrc32;
+import org.apache.cassandra.utils.WaitQueue;
 
 /*
  * A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
@@ -60,35 +67,54 @@ public class CommitLogSegment
     // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
     static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
 
-    // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment
-    private final HashMap<UUID, Integer> cfLastWrite = new HashMap<UUID, Integer>();
+    // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position])
+    static final int SYNC_MARKER_SIZE = 4 + 8;
+
+    // The current AppendLock object - i.e. the one all threads adding new log records should use to synchronise
+    private final AtomicReference<AppendLock> appendLock = new AtomicReference<>(new AppendLock());
+
+    private final AtomicInteger allocatePosition = new AtomicInteger();
+
+    // Everything before this offset has been synced and written.  The SYNC_MARKER_SIZE bytes after
+    // each sync are reserved, and point forwards to the next such offset.  The final
+    // sync marker in a segment will be zeroed out, or point to EOF.
+    private volatile int lastSyncedOffset;
+
+    // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment
+    // to ensure nobody writes to it after we've decided we're done with it
+    private int discardedTailFrom;
+
+    // a signal for writers to wait on to confirm the log message they provided has been written to disk
+    private final WaitQueue syncComplete = new WaitQueue();
+
+    // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use
+    private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024);
+
+    // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use
+    private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>();
 
     public final long id;
 
     private final File logFile;
     private final RandomAccessFile logFileAccessor;
 
-    private boolean needsSync = false;
-
     private final MappedByteBuffer buffer;
-    private final Checksum checksum;
-    private final DataOutputStream bufferStream;
-    private boolean closed;
 
     public final CommitLogDescriptor descriptor;
 
     /**
      * @return a newly minted segment file
      */
-    public static CommitLogSegment freshSegment()
+    static CommitLogSegment freshSegment()
     {
         return new CommitLogSegment(null);
     }
 
-    public static long getNextId()
+    static long getNextId()
     {
         return idBase + nextId.getAndIncrement();
     }
+
     /**
      * Constructs a new segment file.
      *
@@ -122,16 +148,16 @@ public class CommitLogSegment
             if (isCreating)
                 logger.debug("Creating new commit log segment {}", logFile.getPath());
 
-            // Map the segment, extending or truncating it to the standard segment size
+            // Map the segment, extending or truncating it to the standard segment size.
+            // (We may have restarted after a segment size configuration change, leaving "incorrectly"
+            // sized segments on disk.)
             logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
 
             buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
-            checksum = new PureJavaCrc32();
-            bufferStream = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
-            buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
-            buffer.position(0);
-
-            needsSync = true;
+            // mark the initial header as uninitialised
+            buffer.putInt(0, 0);
+            buffer.putLong(4, 0);
+            allocatePosition.set(SYNC_MARKER_SIZE);
         }
         catch (IOException e)
         {
@@ -140,123 +166,184 @@ public class CommitLogSegment
     }
 
     /**
-     * Completely discards a segment file by deleting it. (Potentially blocking operation)
-     */
-    public void discard(boolean deleteFile)
-    {
-        // TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling?
-        close();
-        if (deleteFile)
-            FileUtils.deleteWithConfirm(logFile);
-    }
-
-    /**
-     * Recycle processes an unneeded segment file for reuse.
-     *
-     * @return a new CommitLogSegment representing the newly reusable segment.
+     * allocate space in this buffer for the provided row mutation, and populate the provided
+     * Allocation object, returning true on success. False indicates there is not enough room in
+     * this segment, and a new segment is needed
      */
-    public CommitLogSegment recycle()
+    boolean allocate(RowMutation rowMutation, int size, Allocation alloc)
     {
-        // writes an end-of-segment marker at the very beginning of the file and closes it
-        buffer.position(0);
-        buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
-        buffer.position(0);
-
+        final AppendLock appendLock = lockForAppend();
         try
         {
-            sync();
+            int position = allocate(size);
+            if (position < 0)
+            {
+                appendLock.unlock();
+                return false;
+            }
+            alloc.buffer = (ByteBuffer) buffer.duplicate().position(position).limit(position + size);
+            alloc.position = position;
+            alloc.segment = this;
+            alloc.appendLock = appendLock;
+            markDirty(rowMutation, position);
+            return true;
         }
-        catch (FSWriteError e)
+        catch (Throwable t)
         {
-            logger.error("I/O error flushing {} {}", this, e.getMessage());
-            throw e;
+            appendLock.unlock();
+            throw t;
         }
+    }
 
-        close();
-
-        return new CommitLogSegment(getPath());
+    // obtain the current AppendLock and lock it for record appending
+    private AppendLock lockForAppend()
+    {
+        while (true)
+        {
+            AppendLock appendLock = this.appendLock.get();
+            if (appendLock.lock())
+                return appendLock;
+        }
     }
 
-    /**
-     * @return true if there is room to write() @param size to this segment
-     */
-    public boolean hasCapacityFor(long size)
+    // allocate bytes in the segment, or return -1 if not enough space
+    private int allocate(int size)
     {
-        return size <= buffer.remaining();
+        while (true)
+        {
+            int prev = allocatePosition.get();
+            int next = prev + size;
+            if (next >= buffer.capacity())
+                return -1;
+            if (allocatePosition.compareAndSet(prev, next))
+                return prev;
+        }
     }
 
-    /**
-     * mark all of the column families we're modifying as dirty at this position
-     */
-    private void markDirty(RowMutation rowMutation, ReplayPosition repPos)
+    // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
+    synchronized void discardUnusedTail()
     {
-        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+        if (discardedTailFrom > 0)
+            return;
+        while (true)
         {
-            // check for null cfm in case a cl write goes through after the cf is
-            // defined but before a new segment is created.
-            CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
-            if (cfm == null)
-            {
-                logger.error("Attempted to write commit log entry for unrecognized column family: {}", columnFamily.id());
-            }
-            else
+            int prev = allocatePosition.get();
+            int next = buffer.capacity();
+            if (allocatePosition.compareAndSet(prev, next))
             {
-                markCFDirty(cfm.cfId, repPos.position);
+                discardedTailFrom = prev;
+                return;
             }
         }
     }
 
-   /**
-     * Appends a row mutation onto the commit log.  Requres that hasCapacityFor has already been checked.
-     *
-     * @param   mutation   the mutation to append to the commit log.
-     * @return  the position of the appended mutation
+    /**
+     * Forces a disk flush for this segment file.
      */
-    public ReplayPosition write(RowMutation mutation) throws IOException
+    synchronized void sync()
     {
-        assert !closed;
-        ReplayPosition repPos = getContext();
-        markDirty(mutation, repPos);
+        try
+        {
+            // check we have more work to do
+            if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+                return;
+
+            // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+            // the point at which we can safely consider records to have been completely written to
+            int nextMarker;
+            nextMarker = allocate(SYNC_MARKER_SIZE);
+            boolean close = false;
+            if (nextMarker < 0)
+            {
+                // ensure no more of this CLS is writeable, and mark ourselves for closing
+                discardUnusedTail();
+                close = true;
 
-        checksum.reset();
+                if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE)
+                {
+                    // if there's room in the discard section to write an empty header, use that as the nextMarker
+                    nextMarker = discardedTailFrom;
+                }
+                else
+                {
+                    // not enough space left in the buffer, so mark the next sync marker as the EOF position
+                    nextMarker = buffer.capacity();
+                }
+            }
 
-        // checksummed length
-        int length = (int) RowMutation.serializer.serializedSize(mutation, MessagingService.current_version);
-        bufferStream.writeInt(length);
-        buffer.putLong(checksum.getValue());
+            // swap the append lock
+            AppendLock curAppendLock = appendLock.get();
+            appendLock.set(new AppendLock());
+            curAppendLock.expireAndWaitForCompletion();
+
+            // write previous sync marker to point to next sync marker
+            // we don't chain the crcs here to ensure this method is idempotent if it fails
+            int offset = lastSyncedOffset;
+            final PureJavaCrc32 crc = new PureJavaCrc32();
+            crc.update((int) (id & 0xFFFFFFFFL));
+            crc.update((int) (id >>> 32));
+            crc.update(offset);
+            buffer.putInt(offset, nextMarker);
+            buffer.putLong(offset + 4, crc.getValue());
+
+            // zero out the next sync marker so replayer can cleanly exit
+            if (nextMarker < buffer.capacity())
+            {
+                buffer.putInt(nextMarker, 0);
+                buffer.putLong(nextMarker + 4, 0);
+            }
 
-        // checksummed mutation
-        RowMutation.serializer.serialize(mutation, bufferStream, MessagingService.current_version);
-        buffer.putLong(checksum.getValue());
+            // actually perform the sync and signal those waiting for it
+            buffer.force();
+            syncComplete.signalAll();
 
-        if (buffer.remaining() >= 4)
+            if (close)
+            {
+                close();
+                nextMarker = buffer.capacity();
+            }
+
+            lastSyncedOffset = nextMarker;
+        }
+        catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
         {
-            // writes end of segment marker and rewinds back to position where it starts
-            buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
-            buffer.position(buffer.position() - CommitLog.END_OF_SEGMENT_MARKER_SIZE);
+            throw new FSWriteError(e, getPath());
         }
+    }
 
-        needsSync = true;
-        return repPos;
+    public boolean isFullySynced()
+    {
+        return lastSyncedOffset == buffer.capacity();
     }
 
     /**
-     * Forces a disk flush for this segment file.
+     * Completely discards a segment file by deleting it. (Potentially blocking operation)
      */
-    public void sync()
+    void delete()
     {
-        if (needsSync)
+       FileUtils.deleteWithConfirm(logFile);
+    }
+
+    /**
+     * Recycle processes an unneeded segment file for reuse.
+     *
+     * @return a new CommitLogSegment representing the newly reusable segment.
+     */
+    CommitLogSegment recycle()
+    {
+        try
         {
-            try
-            {
-                buffer.force();
-            }
-            catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
-            {
-                throw new FSWriteError(e, getPath());
-            }
-            needsSync = false;
+            sync();
+        }
+        catch (FSWriteError e)
+        {
+            logger.error("I/O error flushing {} {}", this, e.getMessage());
+            throw e;
         }
+
+        close();
+
+        return new CommitLogSegment(getPath());
     }
 
     /**
@@ -264,7 +351,7 @@ public class CommitLogSegment
      */
     public ReplayPosition getContext()
     {
-        return new ReplayPosition(id, buffer.position());
+        return new ReplayPosition(id, allocatePosition.get());
     }
 
     /**
@@ -286,16 +373,12 @@ public class CommitLogSegment
     /**
      * Close the segment file.
      */
-    public void close()
+    void close()
     {
-        if (closed)
-            return;
-
         try
         {
             FileUtils.clean(buffer);
             logFileAccessor.close();
-            closed = true;
         }
         catch (IOException e)
         {
@@ -303,15 +386,17 @@ public class CommitLogSegment
         }
     }
 
-    /**
-     * Records the CF as dirty at a certain position.
-     *
-     * @param cfId      the column family ID that is now dirty
-     * @param position  the position the last write for this CF was written at
-     */
-    private void markCFDirty(UUID cfId, Integer position)
+    void markDirty(RowMutation rowMutation, int allocatedPosition)
     {
-        cfLastWrite.put(cfId, position);
+        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+        {
+            // check for deleted CFS
+            CFMetaData cfm = columnFamily.metadata();
+            if (cfm.isPurged())
+                logger.error("Attempted to write commit log entry for unrecognized column family: {}", columnFamily.id());
+            else
+                ensureAtleast(cfDirty, cfm.cfId, allocatedPosition);
+        }
     }
 
     /**
@@ -324,20 +409,79 @@ public class CommitLogSegment
      */
     public void markClean(UUID cfId, ReplayPosition context)
     {
-        Integer lastWritten = cfLastWrite.get(cfId);
+        if (!cfDirty.containsKey(cfId))
+            return;
+        if (context.segment == id)
+            markClean(cfId, context.position);
+        else if (context.segment > id)
+            markClean(cfId, Integer.MAX_VALUE);
+    }
+
+    private void markClean(UUID cfId, int position)
+    {
+        ensureAtleast(cfClean, cfId, position);
+        removeCleanFromDirty();
+    }
+
+    private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value)
+    {
+        AtomicInteger i = map.get(cfId);
+        if (i == null)
+        {
+            AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
+            if (i2 != null)
+                i = i2;
+        }
+        while (true)
+        {
+            int cur = i.get();
+            if (cur > value)
+                break;
+            if (i.compareAndSet(cur, value))
+                break;
+        }
+    }
+
+    private void removeCleanFromDirty()
+    {
+        // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely
+        if (!isFullySynced())
+            return;
 
-        if (lastWritten != null && (!contains(context) || lastWritten < context.position))
+        Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator();
+        while (iter.hasNext())
         {
-            cfLastWrite.remove(cfId);
+            Map.Entry<UUID, AtomicInteger> clean = iter.next();
+            UUID cfId = clean.getKey();
+            AtomicInteger cleanPos = clean.getValue();
+            AtomicInteger dirtyPos = cfDirty.get(cfId);
+            if (dirtyPos != null && dirtyPos.intValue() < cleanPos.intValue())
+            {
+                cfDirty.remove(cfId);
+                iter.remove();
+            }
         }
     }
 
+
     /**
      * @return a collection of dirty CFIDs for this segment file.
      */
     public Collection<UUID> getDirtyCFIDs()
     {
-        return cfLastWrite.keySet();
+        removeCleanFromDirty();
+        if (cfClean.isEmpty() || cfDirty.isEmpty())
+            return cfDirty.keySet();
+        List<UUID> r = new ArrayList<>(cfDirty.size());
+        for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
+        {
+            UUID cfId = dirty.getKey();
+            AtomicInteger dirtyPos = dirty.getValue();
+            AtomicInteger cleanPos = cfClean.get(cfId);
+            if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
+                r.add(dirty.getKey());
+        }
+        return r;
     }
 
     /**
@@ -345,7 +489,12 @@ public class CommitLogSegment
      */
     public boolean isUnused()
     {
-        return cfLastWrite.isEmpty();
+        // if it's not fully synced, we assume we're still in use as the active allocatingFrom
+        if (!isFullySynced())
+            return false;
+
+        removeCleanFromDirty();
+        return cfDirty.isEmpty();
     }
 
     /**
@@ -363,7 +512,7 @@ public class CommitLogSegment
     public String dirtyString()
     {
         StringBuilder sb = new StringBuilder();
-        for (UUID cfId : cfLastWrite.keySet())
+        for (UUID cfId : getDirtyCFIDs())
         {
             CFMetaData m = Schema.instance.getCFMetaData(cfId);
             sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), ");
@@ -377,9 +526,9 @@ public class CommitLogSegment
         return "CommitLogSegment(" + getPath() + ')';
     }
 
-    public int position()
+    public boolean equals(Object that)
     {
-        return buffer.position();
+        return super.equals(that);
     }
 
 
@@ -392,4 +541,92 @@ public class CommitLogSegment
             return (int) (desc.id - desc2.id);
         }
     }
+
+    /**
+     * A relatively simple class for synchronising flushes() with log message writers:
+     * Log writers take the readLock prior to allocating themselves space in the segment;
+     * once they complete writing the record they release the read lock. A call to sync()
+     * will first check the position we have allocated space up until, then allocate a new AppendLock object,
+     * take the writeLock of the previous AppendLock, and invalidate it for further log writes. All appends are
+     * redirected to the new AppendLock so they do not block; only the sync() blocks waiting to obtain the writeLock.
+     * Once it obtains the lock it is guaranteed that all writes up to the allocation position it checked at
+     * the start have been completely written to.
+     */
+    private static final class AppendLock
+    {
+        final ReadWriteLock syncLock = new ReentrantReadWriteLock();
+        final Lock logLock = syncLock.readLock();
+        // a map of Cfs with log records that have not been synced to disk, so cannot be marked clean yet
+
+        boolean expired;
+
+        // false if the lock could not be acquired for adding a log record;
+        // a new AppendLock object will already be available, so fetch appendLock().get()
+        // and retry
+        boolean lock()
+        {
+            if (!logLock.tryLock())
+                return false;
+            if (expired)
+            {
+                logLock.unlock();
+                return false;
+            }
+            return true;
+        }
+
+        // release the lock so that a appendLock() may complete
+        void unlock()
+        {
+            logLock.unlock();
+        }
+
+        void expireAndWaitForCompletion()
+        {
+            // wait for log records to complete (take writeLock)
+            syncLock.writeLock().lock();
+            expired = true;
+            // release lock immediately, though effectively a NOOP since we use tryLock() for log record appends
+            syncLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * A simple class for tracking information about the portion of a segment that has been allocated to a log write.
+     * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be
+     * stack-allocated by escape analysis in CommitLog.add.
+     */
+    static final class Allocation
+    {
+        private CommitLogSegment segment;
+        private AppendLock appendLock;
+        private int position;
+        private ByteBuffer buffer;
+
+        CommitLogSegment getSegment()
+        {
+            return segment;
+        }
+
+        ByteBuffer getBuffer()
+        {
+            return buffer;
+        }
+
+        // markWritten() MUST be called once we are done with the segment or the CL will never flush
+        void markWritten()
+        {
+            appendLock.unlock();
+        }
+
+        void awaitDiskSync()
+        {
+            while (segment.lastSyncedOffset < position)
+            {
+                WaitQueue.Signal signal = segment.syncComplete.register();
+                if (segment.lastSyncedOffset < position)
+                    signal.awaitUninterruptibly();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
new file mode 100644
index 0000000..dd96f35
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WaitQueue;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public class CommitLogSegmentManager
+{
+    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class);
+
+    /**
+     * Queue of work to be done by the manager thread.  This is usually a recycle operation, which returns
+     * a CommitLogSegment, or a delete operation, which returns null.
+     */
+    private final BlockingQueue<Callable<CommitLogSegment>> segmentManagementTasks = new LinkedBlockingQueue<>();
+
+    /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
+    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
+
+    /** Active segments, containing unflushed data */
+    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+    /** The segment we are currently allocating commit log records to */
+    private volatile CommitLogSegment allocatingFrom = null;
+
+    private final WaitQueue hasAvailableSegments = new WaitQueue();
+
+    private static final AtomicReferenceFieldUpdater<CommitLogSegmentManager, CommitLogSegment> allocatingFromUpdater = AtomicReferenceFieldUpdater.newUpdater(CommitLogSegmentManager.class, CommitLogSegment.class, "allocatingFrom");
+
+    /**
+     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
+     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+     * on the manager thread, which will take a ms or two).
+     */
+    private final AtomicLong size = new AtomicLong();
+
+    /**
+     * New segment creation is initially disabled because we'll typically get some "free" segments
+     * recycled after log replay.
+     */
+    private volatile boolean createReserveSegments = false;
+
+    private final Thread managerThread;
+    private volatile boolean run = true;
+
+    public CommitLogSegmentManager()
+    {
+        // The run loop for the manager thread
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws Exception
+            {
+                while (run)
+                {
+                    Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+                    if (task == null)
+                    {
+                        // if we have no more work to do, check if we should create a new segment
+                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                        {
+                            logger.debug("No segments in reserve; creating a fresh one");
+                            size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+                            // TODO : some error handling in case we fail to create a new segment
+                            availableSegments.add(CommitLogSegment.freshSegment());
+                            hasAvailableSegments.signalAll();
+                        }
+
+                        // flush old Cfs if we're full
+                        long unused = unusedCapacity();
+                        if (unused < 0)
+                        {
+                            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+                            long spaceToReclaim = 0;
+                            for (CommitLogSegment segment : activeSegments)
+                            {
+                                if (segment == allocatingFrom)
+                                    break;
+                                segmentsToRecycle.add(segment);
+                                spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+                                if (spaceToReclaim + unused >= 0)
+                                    break;
+                            }
+                            flushDataFrom(segmentsToRecycle);
+                        }
+
+                        try
+                        {
+                            // wait for new work to be provided
+                            task = segmentManagementTasks.take();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            // shutdown signal; exit cleanly
+                            continue;
+                        }
+                    }
+
+                    // TODO : some error handling in case we fail on executing call (e.g. recycling)
+                    CommitLogSegment recycled = task.call();
+                    if (recycled != null)
+                    {
+                        // if the work resulted in a segment to recycle, publish it
+                        availableSegments.add(recycled);
+                        hasAvailableSegments.signalAll();
+                    }
+                }
+            }
+        };
+
+        managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+        managerThread.start();
+    }
+
+    /**
+     * Reserve space in the current segment for the provided row mutation or, if there isn't space available,
+     * create a new segment.
+     *
+     * @return the provided Allocation object
+     */
+    public Allocation allocate(RowMutation rowMutation, int size, Allocation alloc)
+    {
+        CommitLogSegment segment = allocatingFrom();
+
+        while (!segment.allocate(rowMutation, size, alloc))
+        {
+            // failed to allocate, so move to a new segment with enough room
+            advanceAllocatingFrom(segment);
+            segment = allocatingFrom;
+        }
+
+        return alloc;
+    }
+
+    // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call
+    CommitLogSegment allocatingFrom()
+    {
+        CommitLogSegment r = allocatingFrom;
+        if (r == null)
+        {
+            advanceAllocatingFrom(null);
+            r = allocatingFrom;
+        }
+        return r;
+    }
+
+    /**
+     * Fetches a new segment from the queue, creating a new one if necessary, and activates it
+     */
+    private void advanceAllocatingFrom(CommitLogSegment old)
+    {
+        while (true)
+        {
+            Iterator<CommitLogSegment> iter = availableSegments.iterator();
+            if (iter.hasNext())
+            {
+                CommitLogSegment next;
+                if (!allocatingFromUpdater.compareAndSet(this, old, next = iter.next()))
+                    // failed to swap so we should already be able to continue
+                    return;
+
+                iter.remove();
+                activeSegments.add(next);
+
+                if (availableSegments.isEmpty())
+                {
+                    // if we've emptied the queue of available segments, trigger the manager to maybe add another
+                    wakeManager();
+                }
+
+                if (old != null)
+                {
+                    // Now we can run the user defined command just after switching to the new commit log.
+                    // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+                    CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName());
+                }
+
+                // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+                if (old != null)
+                    old.discardUnusedTail();
+
+                // request that the CL be synced out-of-band, as we've finished a segment
+                CommitLog.instance.requestExtraSync();
+                return;
+            }
+
+            // no more segments, so register to receive a signal when not empty
+            WaitQueue.Signal signal = hasAvailableSegments.register();
+
+            // trigger the management thread; this must occur after registering
+            // the signal to ensure we are woken by any new segment creation
+            wakeManager();
+
+            // check if the queue has already been added to before waiting on the signal, to catch modifications
+            // that happened prior to registering the signal
+            if (availableSegments.isEmpty())
+            {
+                // check to see if we've been beaten to it
+                if (allocatingFrom != old)
+                    return;
+
+                // can only reach here if the queue hasn't been inserted into
+                // before we registered the signal, as we only remove items from the queue
+                // after updating allocatingFrom. Can safely block until we are signalled
+                // by the allocator that new segments have been published
+                signal.awaitUninterruptibly();
+            }
+        }
+    }
+
+    private void wakeManager()
+    {
+        // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
+        segmentManagementTasks.add(new Callable<CommitLogSegment>()
+        {
+            public CommitLogSegment call()
+            {
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Switch to a new segment, regardless of how much is left in the current one.
+     *
+     * Flushes any dirty CFs for this segment and any older segments, and then recycles
+     * the segments
+     */
+    void forceRecycleAll()
+    {
+        CommitLogSegment last = allocatingFrom;
+        last.discardUnusedTail();
+        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+        advanceAllocatingFrom(last);
+
+        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+        Future<?> future = flushDataFrom(segmentsToRecycle);
+        try
+        {
+            future.get();
+
+            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+            // if the previous active segment was the only one to recycle (since an active segment isn't
+            // necessarily dirty, and we only call dCS after a flush).
+            for (CommitLogSegment segment : activeSegments)
+                if (segment.isUnused())
+                    recycleSegment(segment);
+
+            CommitLogSegment first;
+            assert (first = activeSegments.peek()) == null || first.id > last.id;
+        }
+        catch (Throwable t)
+        {
+            // for now just log the error and return false, indicating that we failed
+            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+        }
+    }
+
+    /**
+     * Indicates that a segment is no longer in use and that it should be recycled.
+     *
+     * @param segment segment that is no longer in use
+     */
+    void recycleSegment(final CommitLogSegment segment)
+    {
+        activeSegments.remove(segment);
+        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
+        {
+            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+            discardSegment(segment, false);
+            return;
+        }
+        if (isCapExceeded())
+        {
+            discardSegment(segment, true);
+            return;
+        }
+
+        logger.debug("Recycling {}", segment);
+        segmentManagementTasks.add(new Callable<CommitLogSegment>()
+        {
+            public CommitLogSegment call()
+            {
+                return segment.recycle();
+            }
+        });
+    }
+
+    /**
+     * Differs from the above because it can work on any file instead of just existing
+     * commit log segments managed by this manager.
+     *
+     * @param file segment file that is no longer in use.
+     */
+    void recycleSegment(final File file)
+    {
+        if (isCapExceeded()
+            || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
+        {
+            // (don't decrease managed size, since this was never a "live" segment)
+            logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
+            FileUtils.deleteWithConfirm(file);
+            return;
+        }
+
+        logger.debug("Recycling {}", file);
+        // this wasn't previously a live segment, so add it to the managed size when we make it live
+        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+        segmentManagementTasks.add(new Callable<CommitLogSegment>()
+        {
+            public CommitLogSegment call()
+            {
+                return new CommitLogSegment(file.getPath());
+            }
+        });
+    }
+
+    /**
+     * Indicates that a segment file should be deleted.
+     *
+     * @param segment segment to be discarded
+     */
+    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
+    {
+        logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
+        size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
+
+        segmentManagementTasks.add(new Callable<CommitLogSegment>()
+        {
+            public CommitLogSegment call()
+            {
+                segment.close();
+                if (deleteFile)
+                    segment.delete();
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @return the space (in bytes) used by all segment files.
+     */
+    public long bytesUsed()
+    {
+        return size.get();
+    }
+
+    /**
+     * @param name the filename to check
+     * @return true if file is managed by this manager.
+     */
+    public boolean manages(String name)
+    {
+        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
+            if (segment.getName().equals(name))
+                return true;
+        return false;
+    }
+
+    /**
+     * Check to see if the speculative current size exceeds the cap.
+     *
+     * @return true if cap is exceeded
+     */
+    private boolean isCapExceeded()
+    {
+        return unusedCapacity() < 0;
+    }
+
+    private long unusedCapacity()
+    {
+        long currentSize = size.get();
+        logger.debug("Total active commitlog segment space used is {}", currentSize);
+        return DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024 - currentSize;
+    }
+
+    /**
+     * Throws a flag that enables the behavior of keeping at least one spare segment
+     * available at all times.
+     */
+    public void enableReserveSegmentCreation()
+    {
+        createReserveSegments = true;
+        wakeManager();
+    }
+
+    /**
+     * Force a flush on all CFs that are still dirty in @param segments.
+     *
+     * @return a Future that will finish when all the flushes are complete.
+     */
+    private Future<?> flushDataFrom(Collection<CommitLogSegment> segments)
+    {
+        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+        final Map<UUID, Future<?>> flushes = new LinkedHashMap<>();
+
+        for (CommitLogSegment segment : segments)
+        {
+            for (UUID dirtyCFId : segment.getDirtyCFIDs())
+            {
+                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+                if (pair == null)
+                {
+                    // even though we remove the schema entry before a final flush when dropping a CF,
+                    // it's still possible for a writer to race and finish his append after the flush.
+                    logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+                    segment.markClean(dirtyCFId, segment.getContext());
+                }
+                else if (!flushes.containsKey(dirtyCFId))
+                {
+                    String keyspace = pair.left;
+                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+                    // Push the flush out to another thread to avoid potential deadlock: Table.add
+                    // acquires switchlock, and could be blocking for the manager thread.  So if the manager
+                    // thread itself tries to acquire switchlock (via flush -> switchMemtable) we'd have a problem.
+                    Runnable runnable = new Runnable()
+                    {
+                        public void run()
+                        {
+                            cfs.forceFlush();
+                        }
+                    };
+                    flushes.put(dirtyCFId, StorageService.optionalTasks.submit(runnable));
+                }
+            }
+        }
+
+        return new FutureTask<>(new Callable<Object>()
+        {
+            public Object call()
+            {
+                FBUtilities.waitOnFutures(flushes.values());
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+     */
+    public void resetUnsafe()
+    {
+        logger.debug("Closing and clearing existing commit log segments...");
+
+        while (!segmentManagementTasks.isEmpty())
+            Thread.yield();
+
+        activeSegments.clear();
+        availableSegments.clear();
+        allocatingFrom = null;
+    }
+
+    /**
+     * Initiates the shutdown process for the management thread.
+     */
+    public void shutdown()
+    {
+        run = false;
+        managerThread.interrupt();
+    }
+
+    /**
+     * Returns when the management thread terminates.
+     */
+    public void awaitTermination() throws InterruptedException
+    {
+        managerThread.join();
+    }
+
+    /**
+     * @return a read-only collection of the active commit log segments
+     */
+    Collection<CommitLogSegment> getActiveSegments()
+    {
+        return Collections.unmodifiableCollection(activeSegments);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
deleted file mode 100644
index e2d0b0f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-
-/**
- * Like ExecutorService, but customized for batch and periodic commitlog execution.
- */
-public interface ICommitLogExecutorService
-{
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks();
-
-    /**
-     * Get the number of tasks waiting to be executed
-     */
-    public long getPendingTasks();
-
-
-    public <T> Future<T> submit(Callable<T> task);
-
-    /**
-     * submits the adder for execution and blocks for it to be synced, if necessary
-     */
-    public void add(CommitLog.LogRecordAdder adder);
-
-    /** shuts down the CommitLogExecutor in an orderly fashion */
-    public void shutdown();
-
-    /** Blocks until shutdown is complete. */
-    public void awaitTermination() throws InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
deleted file mode 100644
index 30f33b6..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
-{
-    private final BlockingQueue<Runnable> queue;
-    protected volatile long completedTaskCount = 0;
-    private final Thread appendingThread;
-    private volatile boolean run = true;
-
-    public PeriodicCommitLogExecutorService(final CommitLog commitLog)
-    {
-        queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    Runnable r = queue.poll(100, TimeUnit.MILLISECONDS);
-                    if (r == null)
-                        continue;
-                    r.run();
-                    completedTaskCount++;
-                }
-                commitLog.sync();
-            }
-        };
-        appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
-        appendingThread.start();
-
-        final Callable syncer = new Callable()
-        {
-            public Object call() throws Exception
-            {
-                commitLog.sync();
-                return null;
-            }
-        };
-
-        new Thread(new Runnable()
-        {
-            public void run()
-            {
-                while (run)
-                {
-                    FBUtilities.waitOnFuture(submit(syncer));
-                    Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS);
-                }
-            }
-        }, "PERIODIC-COMMIT-LOG-SYNCER").start();
-
-    }
-
-    public void add(CommitLog.LogRecordAdder adder)
-    {
-        try
-        {
-            queue.put(adder);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public <T> Future<T> submit(Callable<T> task)
-    {
-        FutureTask<T> ft = new FutureTask<T>(task);
-        try
-        {
-            queue.put(ft);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return ft;
-    }
-
-    public void shutdown()
-    {
-        new Thread(new WrappedRunnable()
-        {
-            public void runMayThrow() throws InterruptedException, IOException
-            {
-                while (!queue.isEmpty())
-                    Thread.sleep(100);
-                run = false;
-                appendingThread.join();
-            }
-        }, "Commitlog Shutdown").start();
-    }
-
-    public void awaitTermination() throws InterruptedException
-    {
-        appendingThread.join();
-    }
-
-    public long getPendingTasks()
-    {
-        return queue.size();
-    }
-
-    public long getCompletedTasks()
-    {
-        return completedTaskCount;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
new file mode 100644
index 0000000..19d2770
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.WaitQueue;
+
+class PeriodicCommitLogService extends AbstractCommitLogService
+{
+
+    private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
+
+    public PeriodicCommitLogService(final CommitLog commitLog)
+    {
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+    }
+
+    protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+    {
+        if (waitForSyncToCatchUp(Long.MAX_VALUE))
+        {
+            // wait until periodic sync() catches up with its schedule
+            long started = System.currentTimeMillis();
+            pending.incrementAndGet();
+            while (waitForSyncToCatchUp(started))
+            {
+                WaitQueue.Signal signal = syncComplete.register();
+                if (waitForSyncToCatchUp(started))
+                    signal.awaitUninterruptibly();
+            }
+            pending.decrementAndGet();
+        }
+    }
+
+    /**
+     * @return true if sync is currently lagging behind inserts
+     */
+    private boolean waitForSyncToCatchUp(long started)
+    {
+        return started > lastSyncedAt + blockWhenSyncLagsMillis;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 9a03480..8347cd9 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -365,6 +365,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return fileLength;
     }
 
+    public long getPosition()
+    {
+        return current;
+    }
+
     @Override
     public void write(int value)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index c18b3a2..7c8ca61 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.metrics;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 
-import org.apache.cassandra.db.commitlog.CommitLogAllocator;
-import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
 
 /**
  * Metrics for commit log
@@ -37,20 +37,20 @@ public class CommitLogMetrics
     /** Current size used by all the commit log segments */
     public final Gauge<Long> totalCommitLogSize;
 
-    public CommitLogMetrics(final ICommitLogExecutorService executor, final CommitLogAllocator allocator)
+    public CommitLogMetrics(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
     {
         completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
         {
             public Long value()
             {
-                return executor.getCompletedTasks();
+                return service.getCompletedTasks();
             }
         });
         pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
         {
             public Long value()
             {
-                return executor.getPendingTasks();
+                return service.getPendingTasks();
             }
         });
         totalCommitLogSize = Metrics.newGauge(factory.createMetricName("TotalCommitLogSize"), new Gauge<Long>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 932829f..c16d906 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,7 +29,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
 
 import javax.management.MBeanServer;
 import javax.management.Notification;
@@ -40,7 +39,6 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
-import com.google.common.util.concurrent.AtomicDouble;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -3254,6 +3252,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
+        // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
+        // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
+        CommitLog.instance.forceRecycleAllSegments();
+
         ColumnFamilyStore.postFlushExecutor.shutdown();
         ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
new file mode 100644
index 0000000..69cdca2
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/AtomicLongArrayUpdater.java
@@ -0,0 +1,74 @@
+package org.apache.cassandra.utils;
+
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public final class AtomicLongArrayUpdater {
+
+    private static final long offset;
+    private static final int shift;
+
+    static final Unsafe theUnsafe;
+
+    static {
+        theUnsafe = (Unsafe) AccessController.doPrivileged(
+                new PrivilegedAction<Object>()
+                {
+                    @Override
+                    public Object run()
+                    {
+                        try
+                        {
+                            Field f = Unsafe.class.getDeclaredField("theUnsafe");
+                            f.setAccessible(true);
+                            return f.get(null);
+                        } catch (NoSuchFieldException e)
+                        {
+                            // It doesn't matter what we throw;
+                            // it's swallowed in getBestComparer().
+                            throw new Error();
+                        } catch (IllegalAccessException e)
+                        {
+                            throw new Error();
+                        }
+                    }
+                });
+        Class<?> clazz = long[].class;
+        offset = theUnsafe.arrayBaseOffset(clazz);
+        shift = shift(theUnsafe.arrayIndexScale(clazz));
+    }
+
+    private static int shift(int scale)
+    {
+        if (Integer.bitCount(scale) != 1)
+            throw new IllegalStateException();
+        return Integer.bitCount(scale - 1);
+    }
+
+    public AtomicLongArrayUpdater() { }
+
+    public final boolean compareAndSet(Object trg, int i, long exp, long upd) {
+        return theUnsafe.compareAndSwapLong(trg, offset + (i << shift), exp, upd);
+    }
+
+    public final void putVolatile(Object trg, int i, long val) {
+        theUnsafe.putLongVolatile(trg, offset + (i << shift), val);
+    }
+
+    public final void putOrdered(Object trg, int i, long val) {
+        theUnsafe.putOrderedLong(trg, offset + (i << shift), val);
+    }
+
+    public final long get(Object trg, int i) {
+        return theUnsafe.getLong(trg, offset + (i << shift));
+    }
+
+    public final long getVolatile(Object trg, int i) {
+        return theUnsafe.getLongVolatile(trg, offset + (i << shift));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index 7df94aa..66ff985 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -36,9 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -78,8 +74,6 @@ public class StatusLogger
         // one offs
         logger.info(String.format("%-25s%10s%10s",
                                   "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
-        logger.info(String.format("%-25s%10s%10s",
-                                  "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
         int pendingCommands = 0;
         for (int n : MessagingService.instance().getCommandPendingTasks().values())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/utils/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/WaitQueue.java b/src/java/org/apache/cassandra/utils/WaitQueue.java
new file mode 100644
index 0000000..01b2559
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/WaitQueue.java
@@ -0,0 +1,264 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * <p>A relatively easy to use utility for general purpose thread signalling.</p>
+ * <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
+ * <pre>
+ * {@code
+ *      while (!conditionMet())
+ *          WaitSignal s = q.register();
+ *              if (!conditionMet())    // or, perhaps more correctly, !conditionChanged()
+ *                  s.await();
+ *              else
+ *                  s.cancel();
+ * }
+ * </pre>
+ * A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll()
+ * to wake up all, waiting threads.
+ *
+ * <p>A few notes on utilisation:</p>
+ * <p>1. A thread will only exit await() when it has been signalled, but this does
+ * not guarantee the condition has not been altered since it was signalled,
+ * and depending on your design it is likely the outer condition will need to be
+ * checked in a loop, though this is not always the case.</p>
+ * <p>2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.</p>
+ * <p>3. If you choose not to wait on the signal (because the condition has been met before you waited on it)
+ * you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be
+ * lost</p>
+ * <p>4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually
+ * indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative
+ * of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition
+ * to be met that we no longer need.
+ * <p>5. This scheme is not fair</p>
+ * <p>6. Only the thread that calls register() may call await()</p>
+ * <p>To understand intuitively how this class works, the idea is simply that a thread, once it considers itself
+ * incapable of making progress, registers itself to be awoken once that condition changes. However, that condition
+ * could have changed between checking and registering (in which case a thread updating the state would have been unable to signal it),
+ * so before going to sleep on the signal, it checks the condition again, sleeping only if it hasn't changed.</p>
+ */
+// TODO : switch to a Lock Free queue
+public final class WaitQueue
+{
+    public final class Signal
+    {
+        private final Thread thread = Thread.currentThread();
+        volatile int signalled;
+
+        private boolean isSignalled()
+        {
+            return signalled == 1;
+        }
+
+        public boolean isCancelled()
+        {
+            return signalled == -1;
+        }
+
+        private boolean signal()
+        {
+            if (signalledUpdater.compareAndSet(this, 0, 1))
+            {
+                LockSupport.unpark(thread);
+                return true;
+            }
+            return false;
+        }
+
+        public void awaitUninterruptibly()
+        {
+            assert !isCancelled();
+            if (thread != Thread.currentThread())
+                throw new IllegalStateException();
+            boolean interrupted = false;
+            while (!isSignalled())
+            {
+                if (Thread.interrupted())
+                    interrupted = true;
+                LockSupport.park();
+            }
+            if (interrupted)
+                thread.interrupt();
+        }
+
+        public void await() throws InterruptedException
+        {
+            assert !isCancelled();
+            while (!isSignalled())
+            {
+                if (Thread.interrupted())
+                {
+                    checkAndClear();
+                    throw new InterruptedException();
+                }
+                if (thread != Thread.currentThread())
+                    throw new IllegalStateException();
+                LockSupport.park();
+            }
+        }
+
+        public long awaitNanos(long nanosTimeout) throws InterruptedException
+        {
+            assert signalled != -1;
+            long start = System.nanoTime();
+            while (!isSignalled())
+            {
+                if (Thread.interrupted())
+                {
+                    checkAndClear();
+                    throw new InterruptedException();
+                }
+                LockSupport.parkNanos(nanosTimeout);
+            }
+            return nanosTimeout - (System.nanoTime() - start);
+        }
+
+        public boolean await(long time, TimeUnit unit) throws InterruptedException
+        {
+            // ignores nanos atm
+            long until = System.currentTimeMillis() + unit.toMillis(time);
+            if (until < 0)
+                until = Long.MAX_VALUE;
+            return awaitUntil(until);
+        }
+
+        public boolean awaitUntil(long until) throws InterruptedException
+        {
+            assert !isCancelled();
+            while (until < System.currentTimeMillis() && !isSignalled())
+            {
+                if (Thread.interrupted())
+                {
+                    checkAndClear();
+                    throw new InterruptedException();
+                }
+                LockSupport.parkUntil(until);
+            }
+            return checkAndClear();
+        }
+
+        private boolean checkAndClear()
+        {
+            if (isSignalled())
+            {
+                signalled = -1;
+                return true;
+            }
+            else if (signalledUpdater.compareAndSet(this, 0, -1))
+            {
+                cleanUpCancelled();
+                return false;
+            }
+            else
+            {
+                // must now be signalled, as checkAndClear() can only be called by
+                // owning thread if used correctly
+                signalled = -1;
+                return true;
+            }
+        }
+
+        public void cancel()
+        {
+            if (signalled < 0)
+                return;
+            if (!signalledUpdater.compareAndSet(this, 0, -1))
+            {
+                signalled = -1;
+                signal();
+                cleanUpCancelled();
+            }
+        }
+
+    }
+
+    private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(Signal.class, "signalled");
+
+    // the waiting signals
+    private final ConcurrentLinkedQueue<Signal> queue = new ConcurrentLinkedQueue<>();
+
+    /**
+     * The calling thread MUST be the thread that uses the signal (for now)
+     * @return
+     */
+    public Signal register()
+    {
+        Signal signal = new Signal();
+        queue.add(signal);
+        return signal;
+    }
+
+    /**
+     * Signal one waiting thread
+     */
+    public void signal()
+    {
+        if (queue.isEmpty())
+            return;
+        Iterator<Signal> iter = queue.iterator();
+        while (iter.hasNext())
+        {
+            Signal next = iter.next();
+            if (next.signal())
+            {
+                iter.remove();
+                return;
+            }
+        }
+    }
+
+    /**
+     * Signal all waiting threads
+     */
+    public void signalAll()
+    {
+        if (queue.isEmpty())
+            return;
+        Iterator<Signal> iter = queue.iterator();
+        while (iter.hasNext())
+        {
+            Signal next = iter.next();
+            if (next.signal())
+                iter.remove();
+        }
+    }
+
+    private void cleanUpCancelled()
+    {
+        Iterator<Signal> iter = queue.iterator();
+        while (iter.hasNext())
+        {
+            Signal next = iter.next();
+            if (next.isCancelled())
+                iter.remove();
+        }
+    }
+
+    /**
+     * Return how many threads are waiting
+     * @return
+     */
+    public int getWaiting()
+    {
+        if (queue.isEmpty())
+            return 0;
+        Iterator<Signal> iter = queue.iterator();
+        int count = 0;
+        while (iter.hasNext())
+        {
+            Signal next = iter.next();
+            if (next.isCancelled())
+                iter.remove();
+            else
+                count++;
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
new file mode 100644
index 0000000..3f65714
--- /dev/null
+++ b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class ComitLogStress
+{
+
+    public static final String format = "%s,%s,%s,%s,%s,%s";
+
+    public static void main(String[] args) throws Exception {
+        int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+        if (args.length >= 1) {
+            NUM_THREADS = Integer.parseInt(args[0]);
+            System.out.println("Setting num threads to: " + NUM_THREADS);
+        }
+        ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory(""), "");
+        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+
+        org.apache.cassandra.SchemaLoader.loadSchema();
+        final AtomicLong count = new AtomicLong();
+        final long start = System.currentTimeMillis();
+        System.out.println(String.format(format, "seconds", "max_mb", "allocated_mb", "free_mb", "diffrence", "count"));
+        scheduled.scheduleAtFixedRate(new Runnable() {
+            long lastUpdate = 0;
+
+            public void run() {
+                Runtime runtime = Runtime.getRuntime();
+                long maxMemory = mb(runtime.maxMemory());
+                long allocatedMemory = mb(runtime.totalMemory());
+                long freeMemory = mb(runtime.freeMemory());
+                long temp = count.get();
+                System.out.println(String.format(format, ((System.currentTimeMillis() - start) / 1000),
+                        maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate));
+                lastUpdate = temp;
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        while (true) {
+            executor.execute(new CommitlogExecutor());
+            count.incrementAndGet();
+        }
+    }
+
+    private static long mb(long maxMemory) {
+        return maxMemory / (1024 * 1024);
+    }
+
+    static final String keyString = UUIDGen.getTimeUUID().toString();
+    public static class CommitlogExecutor implements Runnable {
+        public void run() {
+            String ks = "Keyspace1";
+            ByteBuffer key = ByteBufferUtil.bytes(keyString);
+            RowMutation mutation = new RowMutation(ks, key);
+            mutation.add("Standard1", ByteBufferUtil.bytes("name"), ByteBufferUtil.bytes("value"),
+                    System.currentTimeMillis());
+            CommitLog.instance.add(mutation);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
new file mode 100644
index 0000000..c2888bc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -0,0 +1,137 @@
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.junit.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class WaitQueueTest
+{
+
+    @Test
+    public void testSerial() throws InterruptedException
+    {
+        testSerial(new WaitQueue());
+    }
+    public void testSerial(final WaitQueue queue) throws InterruptedException
+    {
+        Thread[] ts = new Thread[4];
+        for (int i = 0 ; i < ts.length ; i++)
+            ts[i] = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                WaitQueue.Signal wait = queue.register();
+                try
+                {
+                    wait.await();
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+            }
+        });
+        for (int i = 0 ; i < ts.length ; i++)
+            ts[i].start();
+        Thread.sleep(100);
+        queue.signal();
+        queue.signal();
+        queue.signal();
+        queue.signal();
+        for (int i = 0 ; i < ts.length ; i++)
+        {
+            ts[i].join(100);
+            assertFalse(queue.getClass().getName(), ts[i].isAlive());
+        }
+    }
+
+
+    @Test
+    public void testCondition1() throws InterruptedException
+    {
+        testCondition1(new WaitQueue());
+    }
+
+    public void testCondition1(final WaitQueue queue) throws InterruptedException
+    {
+        final AtomicBoolean cond1 = new AtomicBoolean(false);
+        final AtomicBoolean fail = new AtomicBoolean(false);
+        Thread t1 = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    Thread.sleep(200);
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+                WaitQueue.Signal wait = queue.register();
+                if (!cond1.get())
+                {
+                    System.err.println("Condition should have already been met");
+                    fail.set(true);
+                }
+            }
+        });
+        t1.start();
+        Thread.sleep(50);
+        cond1.set(true);
+        Thread.sleep(300);
+        queue.signal();
+        t1.join(300);
+        assertFalse(queue.getClass().getName(), t1.isAlive());
+        assertFalse(fail.get());
+    }
+
+    @Test
+    public void testCondition2() throws InterruptedException
+    {
+        testCondition2(new WaitQueue());
+    }
+    public void testCondition2(final WaitQueue queue) throws InterruptedException
+    {
+        final AtomicBoolean condition = new AtomicBoolean(false);
+        final AtomicBoolean fail = new AtomicBoolean(false);
+        Thread t = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                WaitQueue.Signal wait = queue.register();
+                if (condition.get())
+                {
+                    System.err.println("");
+                    fail.set(true);
+                }
+
+                try
+                {
+                    Thread.sleep(200);
+                    wait.await();
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+                if (!condition.get())
+                {
+                    System.err.println("Woke up when condition not met");
+                    fail.set(true);
+                }
+            }
+        });
+        t.start();
+        Thread.sleep(50);
+        condition.set(true);
+        queue.signal();
+        t.join(300);
+        assertFalse(queue.getClass().getName(), t.isAlive());
+        assertFalse(fail.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 8e5f418..6c1b56b 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -125,10 +125,11 @@ public class CommitLogTest extends SchemaLoader
     @Test
     public void testDeleteIfNotDirty() throws Exception
     {
+        DatabaseDescriptor.getCommitLogSegmentSize();
         CommitLog.instance.resetUnsafe();
         // Roughly 32 MB mutation
         RowMutation rm = new RowMutation("Keyspace1", bytes("k"));
-        rm.add("Standard1", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
+        rm.add("Standard1", bytes("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
 
         // Adding it twice (won't change segment)
         CommitLog.instance.add(rm);
@@ -138,15 +139,17 @@ public class CommitLogTest extends SchemaLoader
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.sync(true);
         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext().get());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 
         // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         RowMutation rm2 = new RowMutation("Keyspace1", bytes("k"));
-        rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/2), 0);
+        rm2.add("Standard2", bytes("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 100), 0);
+        CommitLog.instance.add(rm2);
+        // also forces a new segment, since each entry-with-overhead is just under half the CL size
         CommitLog.instance.add(rm2);
-        // also forces a new segment, since each entry-with-overhead is just over half the CL size
         CommitLog.instance.add(rm2);
 
         assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();


[2/2] git commit: Multithreaded commitlog patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578

Posted by jb...@apache.org.
Multithreaded commitlog
patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22e18f5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22e18f5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22e18f5a

Branch: refs/heads/trunk
Commit: 22e18f5a348a911f89deed9f9984950de451d28a
Parents: 679ec7e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Dec 2 13:07:28 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Dec 2 13:09:19 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |  11 +
 .../org/apache/cassandra/config/Schema.java     |   1 +
 .../org/apache/cassandra/db/DefsTables.java     |   7 +
 .../AbstractCommitLogExecutorService.java       |  55 --
 .../db/commitlog/AbstractCommitLogService.java  | 153 ++++++
 .../BatchCommitLogExecutorService.java          | 176 ------
 .../db/commitlog/BatchCommitLogService.java     |  36 ++
 .../cassandra/db/commitlog/CommitLog.java       | 278 +++++-----
 .../db/commitlog/CommitLogAllocator.java        | 369 -------------
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/commitlog/CommitLogReplayer.java         | 351 +++++++-----
 .../db/commitlog/CommitLogSegment.java          | 487 ++++++++++++-----
 .../db/commitlog/CommitLogSegmentManager.java   | 533 +++++++++++++++++++
 .../db/commitlog/ICommitLogExecutorService.java |  51 --
 .../PeriodicCommitLogExecutorService.java       | 135 -----
 .../db/commitlog/PeriodicCommitLogService.java  |  57 ++
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/CommitLogMetrics.java     |  10 +-
 .../cassandra/service/StorageService.java       |   6 +-
 .../cassandra/utils/AtomicLongArrayUpdater.java |  74 +++
 .../apache/cassandra/utils/StatusLogger.java    |   6 -
 .../org/apache/cassandra/utils/WaitQueue.java   | 264 +++++++++
 .../cassandra/db/commitlog/ComitLogStress.java  |  72 +++
 .../cassandra/concurrent/WaitQueueTest.java     | 137 +++++
 .../org/apache/cassandra/db/CommitLogTest.java  |   9 +-
 26 files changed, 2059 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6efbfbc..bb3a98a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1
+ * Multithreaded commitlog (CASSANDRA-3578)
  * allocate fixed index summary memory pool and resample cold index summaries 
    to use less memory (CASSANDRA-5519)
  * Removed multithreaded compaction (CASSANDRA-6142)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d04dc25..0a33c20 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -405,6 +405,7 @@ public final class CFMetaData
     private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
     private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
+    private volatile boolean isPurged = false;
 
     /*
      * All CQL3 columns definition are stored in the columnMetadata map.
@@ -1546,6 +1547,16 @@ public final class CFMetaData
         return rm;
     }
 
+    public boolean isPurged()
+    {
+        return isPurged;
+    }
+
+    void markPurged()
+    {
+        isPurged = true;
+    }
+
     public void toSchema(RowMutation rm, long timestamp)
     {
         toSchemaNoColumnsNoTriggers(rm, timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 146b82b..a38c097 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -348,6 +348,7 @@ public class Schema
     public void purge(CFMetaData cfm)
     {
         cfIdMap.remove(Pair.create(cfm.ksName, cfm.cfName));
+        cfm.markPurged();
     }
 
     /* Version control */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 3cd5156..828981e 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,7 @@ import java.util.*;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -461,6 +462,10 @@ public class DefsTables
         // remove the keyspace from the static instances.
         Keyspace.clear(ksm.name);
         Schema.instance.clearKeyspaceDefinition(ksm);
+
+        // force a new segment in the CL
+        CommitLog.instance.forceRecycleAllSegments();
+
         if (!StorageService.instance.isClientMode())
         {
             MigrationManager.instance.notifyDropKeyspace(ksm);
@@ -482,6 +487,8 @@ public class DefsTables
 
         CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 
+        CommitLog.instance.forceRecycleAllSegments();
+
         if (!StorageService.instance.isClientMode())
         {
             if (DatabaseDescriptor.isAutoSnapshot())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
deleted file mode 100644
index ec43114..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public abstract class AbstractCommitLogExecutorService extends AbstractExecutorService implements ICommitLogExecutorService
-{
-    protected volatile long completedTaskCount = 0;
-
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks()
-    {
-        return completedTaskCount;
-    }
-
-    public boolean isTerminated()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isShutdown()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public List<Runnable> shutdownNow()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
new file mode 100644
index 0000000..2f9b236
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.slf4j.*;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+public abstract class AbstractCommitLogService
+{
+    private final Thread thread;
+    private volatile boolean shutdown = false;
+
+    // all Allocations written before this time will be synced
+    protected volatile long lastSyncedAt = System.currentTimeMillis();
+
+    // counts of total written, and pending, log messages
+    private final AtomicLong written = new AtomicLong(0);
+    protected final AtomicLong pending = new AtomicLong(0);
+
+    // signal that writers can wait on to be notified of a completed sync
+    protected final WaitQueue syncComplete = new WaitQueue();
+    private final Semaphore haveWork = new Semaphore(1);
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    {
+        if (pollIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                boolean run = true;
+                while (run)
+                {
+                    try
+                    {
+                        // always run once after shutdown signalled
+                        run = !shutdown;
+
+                        // sync and signal
+                        long syncStarted = System.currentTimeMillis();
+                        commitLog.sync(shutdown);
+                        lastSyncedAt = syncStarted;
+                        syncComplete.signalAll();
+
+                        // sleep any time we have left before the next one is due
+                        long sleep = syncStarted + pollIntervalMillis - System.currentTimeMillis();
+                        if (sleep < 0)
+                        {
+                            logger.warn(String.format("Commit log sync took longer than sync interval (by %.2fs), indicating it is a bottleneck", sleep / -1000d));
+                            // don't sleep, as we probably have work to do
+                            continue;
+                        }
+                        try
+                        {
+                            haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        logger.error("Commit log sync failed", t);
+                        // sleep for full poll-interval after an error, so we don't spam the log file
+                        try
+                        {
+                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                    }
+                }
+            }
+        };
+
+        thread = new Thread(runnable, name);
+        thread.start();
+    }
+
+    /**
+     * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
+     */
+    public void finishWriteFor(Allocation alloc)
+    {
+        maybeWaitForSync(alloc);
+        written.incrementAndGet();
+    }
+
+    protected abstract void maybeWaitForSync(Allocation alloc);
+
+    /**
+     * Sync immediately, but don't block for the sync to cmplete
+     */
+    public void requestExtraSync()
+    {
+        haveWork.release();
+    }
+
+    public void shutdown()
+    {
+        shutdown = true;
+        haveWork.release(1);
+    }
+
+    public void awaitTermination() throws InterruptedException
+    {
+        thread.join();
+    }
+
+    public long getCompletedTasks()
+    {
+        return written.incrementAndGet();
+    }
+
+    public long getPendingTasks()
+    {
+        return pending.incrementAndGet();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
deleted file mode 100644
index d985f1f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.ArrayList;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
-{
-    private final BlockingQueue<CheaterFutureTask> queue;
-    private final Thread appendingThread;
-    private volatile boolean run = true;
-
-    public BatchCommitLogExecutorService()
-    {
-        this(DatabaseDescriptor.getConcurrentWriters());
-    }
-
-    public BatchCommitLogExecutorService(int queueSize)
-    {
-        queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    if (processWithSyncBatch())
-                        completedTaskCount++;
-                }
-            }
-        };
-        appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
-        appendingThread.start();
-
-    }
-
-    public long getPendingTasks()
-    {
-        return queue.size();
-    }
-
-    private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
-    private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
-    private boolean processWithSyncBatch() throws Exception
-    {
-        CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
-        if (firstTask == null)
-            return false;
-        if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
-        {
-            firstTask.run();
-            return true;
-        }
-
-        // attempt to do a bunch of LogRecordAdder ops before syncing
-        // (this is a little clunky since there is no blocking peek method,
-        //  so we have to break it into firstTask / extra tasks)
-        incompleteTasks.clear();
-        taskValues.clear();
-        long start = System.nanoTime();
-        long window = (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
-        // it doesn't seem worth bothering future-izing the exception
-        // since if a commitlog op throws, we're probably screwed anyway
-        incompleteTasks.add(firstTask);
-        taskValues.add(firstTask.getRawCallable().call());
-        while (!queue.isEmpty()
-               && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
-               && System.nanoTime() - start < window)
-        {
-            CheaterFutureTask task = queue.remove();
-            incompleteTasks.add(task);
-            taskValues.add(task.getRawCallable().call());
-        }
-
-        // now sync and set the tasks' values (which allows thread calling get() to proceed)
-        CommitLog.instance.sync();
-        for (int i = 0; i < incompleteTasks.size(); i++)
-        {
-            incompleteTasks.get(i).set(taskValues.get(i));
-        }
-        return true;
-    }
-
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
-    {
-        return newTaskFor(Executors.callable(runnable, value));
-    }
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
-    {
-        return new CheaterFutureTask(callable);
-    }
-
-    public void execute(Runnable command)
-    {
-        try
-        {
-            queue.put((CheaterFutureTask)command);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void add(CommitLog.LogRecordAdder adder)
-    {
-        FBUtilities.waitOnFuture(submit((Callable)adder));
-    }
-
-    public void shutdown()
-    {
-        new Thread(new WrappedRunnable()
-        {
-            public void runMayThrow() throws InterruptedException
-            {
-                while (!queue.isEmpty())
-                    Thread.sleep(100);
-                run = false;
-                appendingThread.join();
-            }
-        }, "Commitlog Shutdown").start();
-    }
-
-    public void awaitTermination() throws InterruptedException
-    {
-        appendingThread.join();
-    }
-
-    private static class CheaterFutureTask<V> extends FutureTask<V>
-    {
-        private final Callable rawCallable;
-
-        public CheaterFutureTask(Callable<V> callable)
-        {
-            super(callable);
-            rawCallable = callable;
-        }
-
-        public Callable getRawCallable()
-        {
-            return rawCallable;
-        }
-
-        @Override
-        public void set(V v)
-        {
-            super.set(v);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
new file mode 100644
index 0000000..65bee40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class BatchCommitLogService extends AbstractCommitLogService
+{
+    public BatchCommitLogService(CommitLog commitLog)
+    {
+        super(commitLog, "COMMIT-LOG-WRITER", (int) DatabaseDescriptor.getCommitLogSyncBatchWindow());
+    }
+
+    protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+    {
+        // wait until record has been safely persisted to disk
+        pending.incrementAndGet();
+        alloc.awaitDiskSync();
+        pending.decrementAndGet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 706df37..7240aee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.io.util.ChecksummedOutputStream;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+import com.google.common.util.concurrent.Futures;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
 
 /*
  * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -47,29 +54,24 @@ public class CommitLog implements CommitLogMBean
 
     public static final CommitLog instance = new CommitLog();
 
-    private final ICommitLogExecutorService executor;
-
-    public final CommitLogAllocator allocator;
+    // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
+    // empty segments when writing large records
+    private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
 
+    public final CommitLogSegmentManager allocator;
     public final CommitLogArchiver archiver = new CommitLogArchiver();
-
-    public static final int END_OF_SEGMENT_MARKER = 0;          // this is written out at the end of a segment
-    public static final int END_OF_SEGMENT_MARKER_SIZE = 4;     // number of bytes of ^^^
-
-    public CommitLogSegment activeSegment;
-
     private final CommitLogMetrics metrics;
+    final AbstractCommitLogService executor;
 
     private CommitLog()
     {
         DatabaseDescriptor.createAllDirectories();
 
-        allocator = new CommitLogAllocator();
-        activateNextSegment();
+        allocator = new CommitLogSegmentManager();
 
         executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
-                 ? new BatchCommitLogExecutorService()
-                 : new PeriodicCommitLogExecutorService(this);
+                 ? new BatchCommitLogService(this)
+                 : new PeriodicCommitLogService(this);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -86,15 +88,6 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * FOR TESTING PURPOSES. See CommitLogAllocator.
-     */
-    public void resetUnsafe()
-    {
-        allocator.resetUnsafe();
-        activateNextSegment();
-    }
-
-    /**
      * Perform recovery on commit logs located in the directory specified by the config file.
      *
      * @return the number of mutations replayed
@@ -121,7 +114,7 @@ public class CommitLog implements CommitLogMBean
         }
         else
         {
-            Arrays.sort(files, new CommitLogSegment.CommitLogSegmentFileComparator());
+            Arrays.sort(files, new CommitLogSegmentFileComparator());
             logger.info("Replaying {}", StringUtils.join(files, ", "));
             replayed = recover(files);
             logger.info("Log replay complete, {} replayed mutations", replayed);
@@ -157,38 +150,84 @@ public class CommitLog implements CommitLogMBean
 
     /**
      * @return a Future representing a ReplayPosition such that when it is ready,
-     * all commitlog tasks enqueued prior to the getContext call will be complete (i.e., appended to the log)
+     * all Allocations created prior to the getContext call will be written to the log
      */
     public Future<ReplayPosition> getContext()
     {
-        Callable<ReplayPosition> task = new Callable<ReplayPosition>()
+        return Futures.immediateFuture(allocator.allocatingFrom().getContext());
+    }
+
+    /**
+     * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+     */
+    public void forceRecycleAllSegments()
+    {
+        allocator.forceRecycleAll();
+    }
+
+    /**
+     * Forces a disk flush on the commit log files that need it.  Blocking.
+     */
+    public void sync(boolean syncAllSegments)
+    {
+        CommitLogSegment current = allocator.allocatingFrom();
+        for (CommitLogSegment segment : allocator.getActiveSegments())
         {
-            public ReplayPosition call()
-            {
-                return activeSegment.getContext();
-            }
-        };
-        return executor.submit(task);
+            if (!syncAllSegments && segment.id > current.id)
+                return;
+            segment.sync();
+        }
     }
 
     /**
-     * Used by tests.
-     *
-     * @return the number of active segments (segments with unflushed data in them)
+     * Preempts the CLExecutor, telling to to sync immediately
      */
-    public int activeSegments()
+    public void requestExtraSync()
     {
-        return allocator.getActiveSegments().size();
+        executor.requestExtraSync();
     }
 
     /**
      * Add a RowMutation to the commit log.
      *
-     * @param rm the RowMutation to add to the log
+     * @param rowMutation the RowMutation to add to the log
      */
-    public void add(RowMutation rm)
+    public void add(RowMutation rowMutation)
     {
-        executor.add(new LogRecordAdder(rm));
+        long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+
+        long totalSize = size + ENTRY_OVERHEAD_SIZE;
+        if (totalSize > MAX_MUTATION_SIZE)
+        {
+            logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+            return;
+        }
+
+        Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+        try
+        {
+            PureJavaCrc32 checksum = new PureJavaCrc32();
+            final ByteBuffer buffer = alloc.getBuffer();
+            DataOutputStream dos = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
+
+            // checksummed length
+            dos.writeInt((int) size);
+            buffer.putLong(checksum.getValue());
+
+            // checksummed mutation
+            RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+            buffer.putLong(checksum.getValue());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, alloc.getSegment().getPath());
+        }
+        finally
+        {
+            alloc.markWritten();
+        }
+
+        executor.finishWriteFor(alloc);
     }
 
     /**
@@ -200,104 +239,58 @@ public class CommitLog implements CommitLogMBean
      */
     public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
     {
-        Callable task = new Callable()
+        logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+        // Go thru the active segment files, which are ordered oldest to newest, marking the
+        // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+        // in the arguments. Any segments that become unused after they are marked clean will be
+        // recycled or discarded.
+        for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
         {
-            public Object call()
+            CommitLogSegment segment = iter.next();
+            segment.markClean(cfId, context);
+
+            if (segment.isUnused())
             {
-                logger.debug("discard completed log segments for {}, column family {}", context, cfId);
-
-                // Go thru the active segment files, which are ordered oldest to newest, marking the
-                // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
-                // in the arguments. Any segments that become unused after they are marked clean will be
-                // recycled or discarded.
-                for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
-                {
-                    CommitLogSegment segment = iter.next();
-                    segment.markClean(cfId, context);
-
-                    // If the segment is no longer needed, and we have another spare segment in the hopper
-                    // (to keep the last segment from getting discarded), pursue either recycling or deleting
-                    // this segment file.
-                    if (iter.hasNext())
-                    {
-                        if (segment.isUnused())
-                        {
-                            logger.debug("Commit log segment {} is unused", segment);
-                            allocator.recycleSegment(segment);
-                        }
-                        else
-                        {
-                            logger.debug("Not safe to delete commit log segment {}; dirty is {}",
-                                         segment, segment.dirtyString());
-                        }
-                    }
-                    else
-                    {
-                        logger.debug("Not deleting active commitlog segment {}", segment);
-                    }
-
-                    // Don't mark or try to delete any newer segments once we've reached the one containing the
-                    // position of the flush.
-                    if (segment.contains(context))
-                        break;
-                }
-
-                return null;
+                logger.debug("Commit log segment {} is unused", segment);
+                allocator.recycleSegment(segment);
+            }
+            else
+            {
+                logger.debug("Not safe to delete{} commit log segment {}; dirty is {}",
+                        (iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
             }
-        };
-
-        FBUtilities.waitOnFuture(executor.submit(task));
-    }
 
-    /**
-     * Forces a disk flush on the commit log files that need it.
-     */
-    public void sync()
-    {
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-        {
-            segment.sync();
+            // Don't mark or try to delete any newer segments once we've reached the one containing the
+            // position of the flush.
+            if (segment.contains(context))
+                break;
         }
     }
 
-    /**
-     * @return the number of tasks completed by the commit log executor
-     */
+    @Override
     public long getCompletedTasks()
     {
         return metrics.completedTasks.value();
     }
 
-    /**
-     * @return the depth of pending commit log executor queue
-     */
+    @Override
     public long getPendingTasks()
     {
         return metrics.pendingTasks.value();
     }
 
     /**
-     * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+     * @return the total size occupied by commitlog segments expressed in bytes. (used by MBean)
      */
     public long getTotalCommitlogSize()
     {
         return metrics.totalCommitLogSize.value();
     }
 
-    /**
-     * Fetches a new segment file from the allocator and activates it.
-     *
-     * @return the newly activated segment
-     */
-    private void activateNextSegment()
-    {
-        activeSegment = allocator.fetchSegment();
-        logger.debug("Active segment is now {}", activeSegment);
-    }
-
     public List<String> getActiveSegmentNames()
     {
-        List<String> segmentNames = new ArrayList<String>();
+        List<String> segmentNames = new ArrayList<>();
         for (CommitLogSegment segment : allocator.getActiveSegments())
             segmentNames.add(segment.getName());
         return segmentNames;
@@ -305,7 +298,7 @@ public class CommitLog implements CommitLogMBean
 
     public List<String> getArchivingSegmentNames()
     {
-        return new ArrayList<String>(archiver.archivePending.keySet());
+        return new ArrayList<>(archiver.archivePending.keySet());
     }
 
     /**
@@ -319,48 +312,21 @@ public class CommitLog implements CommitLogMBean
         allocator.awaitTermination();
     }
 
-    // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
-    // without breaking the fragile CheaterFutureTask in BatchCLES.
-    class LogRecordAdder implements Callable, Runnable
+    /**
+     * FOR TESTING PURPOSES. See CommitLogAllocator.
+     */
+    public void resetUnsafe()
     {
-        final RowMutation rowMutation;
-
-        LogRecordAdder(RowMutation rm)
-        {
-            this.rowMutation = rm;
-        }
-
-        public void run()
-        {
-            long totalSize = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
-            if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
-            {
-                logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
-                return;
-            }
-
-            if (!activeSegment.hasCapacityFor(totalSize))
-            {
-                CommitLogSegment oldSegment = activeSegment;
-                activateNextSegment();
-                // Now we can run the user defined command just before switching to the new commit log.
-                // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
-            }
-            try
-            {
-                activeSegment.write(rowMutation);
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, activeSegment.getPath());
-            }
-        }
+        allocator.resetUnsafe();
+    }
 
-        public Object call()
-        {
-            run();
-            return null;
-        }
+    /**
+     * Used by tests.
+     *
+     * @return the number of active segments (segments with unflushed data in them)
+     */
+    public int activeSegments()
+    {
+        return allocator.getActiveSegments().size();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
deleted file mode 100644
index 706cf9e..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * Performs the pre-allocation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogAllocator
-{
-    static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
-
-    /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
-    public final static int TICK_CYCLE_TIME = 100;
-
-    /** Segments that are ready to be used */
-    private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
-
-    /** Allocations to be run by the thread */
-    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-
-    /** Active segments, containing unflushed data */
-    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
-
-    /**
-     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
-     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
-     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
-     * on the allocator thread, which will take a ms or two).
-     */
-    private final AtomicLong size = new AtomicLong();
-
-    /**
-     * New segment creation is initially disabled because we'll typically get some "free" segments
-     * recycled after log replay.
-     */
-    private volatile boolean createReserveSegments = false;
-
-    private final Thread allocationThread;
-    private volatile boolean run = true;
-
-    public CommitLogAllocator()
-    {
-        // The run loop for the allocation thread
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
-                    if (r != null)
-                    {
-                        r.run();
-                    }
-                    else
-                    {
-                        // no job, so we're clear to check to see if we're out of segments
-                        // and ready a new one if needed. has the effect of ensuring there's
-                        // almost always a segment available when it's needed.
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
-                        {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            createFreshSegment();
-                        }
-                    }
-                }
-            }
-        };
-
-        allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
-        allocationThread.start();
-    }
-
-    /**
-     * Fetches an empty segment file.
-     *
-     * @return the next writable segment
-     */
-    public CommitLogSegment fetchSegment()
-    {
-        CommitLogSegment next;
-        try
-        {
-            next = availableSegments.take();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
-        assert !activeSegments.contains(next);
-        activeSegments.add(next);
-        if (isCapExceeded())
-            flushOldestKeyspaces();
-
-        return next;
-    }
-
-    /**
-     * Indicates that a segment is no longer in use and that it should be recycled.
-     *
-     * @param segment segment that is no longer in use
-     */
-    public void recycleSegment(final CommitLogSegment segment)
-    {
-        activeSegments.remove(segment);
-        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
-        {
-            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
-            discardSegment(segment, false);
-            return;
-        }
-        if (isCapExceeded())
-        {
-            discardSegment(segment, true);
-            return;
-        }
-
-        logger.debug("Recycling {}", segment);
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                CommitLogSegment recycled = segment.recycle();
-                internalAddReadySegment(recycled);
-            }
-        });
-    }
-
-    /**
-     * Differs from the above because it can work on any file instead of just existing
-     * commit log segments managed by this allocator.
-     *
-     * @param file segment file that is no longer in use.
-     */
-    public void recycleSegment(final File file)
-    {
-        // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
-        if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
-                || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
-        {
-            // (don't decrease managed size, since this was never a "live" segment)
-            logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
-            FileUtils.deleteWithConfirm(file);
-            return;
-        }
-
-        logger.debug("Recycling {}", file);
-        // this wasn't previously a live segment, so add it to the managed size when we make it live
-        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                CommitLogSegment segment = new CommitLogSegment(file.getPath());
-                internalAddReadySegment(segment);
-            }
-        });
-    }
-
-    /**
-     * Indicates that a segment file should be deleted.
-     *
-     * @param segment segment to be discarded
-     */
-    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
-    {
-        logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
-        size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
-
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                segment.discard(deleteFile);
-            }
-        });
-    }
-
-    /**
-     * @return the space (in bytes) used by all segment files.
-     */
-    public long bytesUsed()
-    {
-        return size.get();
-    }
-
-    /**
-     * @param name the filename to check
-     * @return true if file is managed by this allocator.
-     */
-    public boolean manages(String name)
-    {
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            if (segment.getName().equals(name))
-                return true;
-
-        return false;
-    }
-
-    /**
-     * Creates and readies a brand new segment.
-     *
-     * @return the newly minted segment
-     */
-    private CommitLogSegment createFreshSegment()
-    {
-        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-        return internalAddReadySegment(CommitLogSegment.freshSegment());
-    }
-
-    /**
-     * Adds a segment to our internal tracking list and makes it ready for consumption.
-     *
-     * @param   segment the segment to add
-     * @return  the newly added segment
-     */
-    private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
-    {
-        assert !activeSegments.contains(segment);
-        assert !availableSegments.contains(segment);
-        availableSegments.add(segment);
-        return segment;
-    }
-
-    /**
-     * Check to see if the speculative current size exceeds the cap.
-     *
-     * @return true if cap is exceeded
-     */
-    private boolean isCapExceeded()
-    {
-        long currentSize = size.get();
-        logger.debug("Total active commitlog segment space used is {}", currentSize);
-        return currentSize > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
-    }
-
-    /**
-     * Throws a flag that enables the behavior of keeping at least one spare segment
-     * available at all times.
-     */
-    public void enableReserveSegmentCreation()
-    {
-        createReserveSegments = true;
-    }
-
-    /**
-     * Force a flush on all dirty CFs represented in the oldest commitlog segment
-     */
-    private void flushOldestKeyspaces()
-    {
-        CommitLogSegment oldestSegment = activeSegments.peek();
-
-        if (oldestSegment != null)
-        {
-            for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
-            {
-                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
-                if (pair == null)
-                {
-                    // even though we remove the schema entry before a final flush when dropping a CF,
-                    // it's still possible for a writer to race and finish his append after the flush.
-                    logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
-                    oldestSegment.markClean(dirtyCFId, oldestSegment.getContext());
-                }
-                else
-                {
-                    String keypace = pair.left;
-                    final ColumnFamilyStore cfs = Keyspace.open(keypace).getColumnFamilyStore(dirtyCFId);
-                    // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
-                    // which may already be held by a thread waiting for the CL executor (via getContext),
-                    // causing deadlock
-                    Runnable runnable = new Runnable()
-                    {
-                        public void run()
-                        {
-                            cfs.forceFlush();
-                        }
-                    };
-                    StorageService.optionalTasks.execute(runnable);
-                }
-            }
-        }
-    }
-
-    /**
-     * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
-     */
-    public void resetUnsafe()
-    {
-        logger.debug("Closing and clearing existing commit log segments...");
-
-        while (!queue.isEmpty())
-            Thread.yield();
-
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            segment.close();
-
-        activeSegments.clear();
-        availableSegments.clear();
-    }
-
-    /**
-     * Initiates the shutdown process for the allocator thread.
-     */
-    public void shutdown()
-    {
-        run = false;
-    }
-
-    /**
-     * Returns when the allocator thread terminates.
-     */
-    public void awaitTermination() throws InterruptedException
-    {
-        allocationThread.join();
-    }
-
-    /**
-     * @return a read-only collection of the active commit log segments
-     */
-    public Collection<CommitLogSegment> getActiveSegments()
-    {
-        return Collections.unmodifiableCollection(activeSegments);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index f020182..1385ea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -137,7 +137,7 @@ public class CommitLogArchiver
         {
             if (e.getCause() instanceof IOException)
             {
-                logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+                logger.error("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
                 return false;
             }
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ae091bf..c42ba9b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -48,6 +48,7 @@ public class CommitLogReplayer
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 
     private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
@@ -114,6 +115,53 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
+    private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+    {
+        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
+        {
+            if (offset != reader.length() && offset != Integer.MAX_VALUE)
+                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+            // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+            return -1;
+        }
+        reader.seek(offset);
+        PureJavaCrc32 crc = new PureJavaCrc32();
+        crc.update((int) (segmentId & 0xFFFFFFFFL));
+        crc.update((int) (segmentId >>> 32));
+        crc.update((int) reader.getPosition());
+        int end = reader.readInt();
+        long filecrc = reader.readLong();
+        if (crc.getValue() != filecrc)
+        {
+            if (end != 0 || filecrc != 0)
+            {
+                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+            }
+            return -1;
+        }
+        else if (end < offset || end > reader.length())
+        {
+            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            return -1;
+        }
+        return end;
+    }
+
+    private int getStartOffset(long segmentId, int version, File file)
+    {
+        if (globalPosition.segment < segmentId)
+        {
+            if (version >= CommitLogDescriptor.VERSION_21)
+                return CommitLogSegment.SYNC_MARKER_SIZE;
+            else
+                return 0;
+        }
+        else if (globalPosition.segment == segmentId)
+            return globalPosition.position;
+        else
+            return -1;
+    }
+
     private abstract static class ReplayFilter
     {
         public abstract Iterable<ColumnFamily> filter(RowMutation rm);
@@ -181,182 +229,199 @@ public class CommitLogReplayer
         final ReplayFilter replayFilter = ReplayFilter.create();
         logger.info("Replaying {}", file.getPath());
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-        final long segment = desc.id;
+        final long segmentId = desc.id;
         int version = desc.getMessagingVersion();
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+
         try
         {
             assert reader.length() <= Integer.MAX_VALUE;
-            int replayPosition;
-            if (globalPosition.segment < segment)
-            {
-                replayPosition = 0;
-            }
-            else if (globalPosition.segment == segment)
-            {
-                replayPosition = globalPosition.position;
-            }
-            else
+            int offset = getStartOffset(segmentId, version, file);
+            if (offset < 0)
             {
                 logger.debug("skipping replay of fully-flushed {}", file);
                 return;
             }
 
-            if (logger.isDebugEnabled())
-                logger.debug("Replaying {} starting at {}", file, replayPosition);
-            reader.seek(replayPosition);
-
-            /* read the logs populate RowMutation and apply */
-            while (!reader.isEOF())
+            int prevEnd = 0;
+            main: while (true)
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading mutation at {}", reader.getFilePointer());
 
-                long claimedCRC32;
-                int serializedSize;
-                try
+                int end = prevEnd;
+                if (version < CommitLogDescriptor.VERSION_21)
+                    end = Integer.MAX_VALUE;
+                else
                 {
-                    // any of the reads may hit EOF
-                    serializedSize = reader.readInt();
-                    if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
-                    {
-                        logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
-                        break;
-                    }
-
-                    // RowMutation must be at LEAST 10 bytes:
-                    // 3 each for a non-empty Keyspace and Key (including the
-                    // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
-                    // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
-                    if (serializedSize < 10)
-                        break;
-
-                    long claimedSizeChecksum = reader.readLong();
-                    checksum.reset();
-                    if (version < CommitLogDescriptor.VERSION_20)
-                        checksum.update(serializedSize);
-                    else
-                        FBUtilities.updateChecksumInt(checksum, serializedSize);
-
-                    if (checksum.getValue() != claimedSizeChecksum)
-                        break; // entry wasn't synced correctly/fully. that's
-                               // ok.
-
-                    if (serializedSize > buffer.length)
-                        buffer = new byte[(int) (1.2 * serializedSize)];
-                    reader.readFully(buffer, 0, serializedSize);
-                    claimedCRC32 = reader.readLong();
-                }
-                catch (EOFException eof)
-                {
-                    break; // last CL entry didn't get completely written. that's ok.
+                    do { end = readHeader(segmentId, end, reader); }
+                    while (end < offset && end > prevEnd);
                 }
 
-                checksum.update(buffer, 0, serializedSize);
-                if (claimedCRC32 != checksum.getValue())
-                {
-                    // this entry must not have been fsynced. probably the rest is bad too,
-                    // but just in case there is no harm in trying them (since we still read on an entry boundary)
-                    continue;
-                }
+                if (end < prevEnd)
+                    break;
 
-                /* deserialize the commit log entry */
-                FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
-                final RowMutation rm;
-                try
-                {
-                    // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
-                    // the current version. so do make sure the CL is drained prior to upgrading a node.
-                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
-                    // doublecheck that what we read is [still] valid for the current schema
-                    for (ColumnFamily cf : rm.getColumnFamilies())
-                        for (Column cell : cf)
-                            cf.getComparator().validate(cell.name());
-                }
-                catch (UnknownColumnFamilyException ex)
-                {
-                    if (ex.cfId == null)
-                        continue;
-                    AtomicInteger i = invalidMutations.get(ex.cfId);
-                    if (i == null)
-                    {
-                        i = new AtomicInteger(1);
-                        invalidMutations.put(ex.cfId, i);
-                    }
-                    else
-                        i.incrementAndGet();
-                    continue;
-                }
-                catch (Throwable t)
+                if (logger.isDebugEnabled())
+                    logger.debug("Replaying {} between {} and {}", file, offset, prevEnd);
+
+                reader.seek(offset);
+
+                 /* read the logs populate RowMutation and apply */
+                while (reader.getPosition() < end && !reader.isEOF())
                 {
-                    File f = File.createTempFile("mutation", "dat");
-                    DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+                    if (logger.isDebugEnabled())
+                        logger.debug("Reading mutation at {}", reader.getFilePointer());
+
+                    long claimedCRC32;
+                    int serializedSize;
                     try
                     {
-                        out.write(buffer, 0, serializedSize);
+                        // any of the reads may hit EOF
+                        serializedSize = reader.readInt();
+                        if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+                        {
+                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+                            break main;
+                        }
+
+                        // RowMutation must be at LEAST 10 bytes:
+                        // 3 each for a non-empty Keyspace and Key (including the
+                        // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+                        // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+                        if (serializedSize < 10)
+                            break main;
+
+                        long claimedSizeChecksum = reader.readLong();
+                        checksum.reset();
+                        if (version < CommitLogDescriptor.VERSION_20)
+                            checksum.update(serializedSize);
+                        else
+                            FBUtilities.updateChecksumInt(checksum, serializedSize);
+
+                        if (checksum.getValue() != claimedSizeChecksum)
+                            break main; // entry wasn't synced correctly/fully. that's
+                        // ok.
+
+                        if (serializedSize > buffer.length)
+                            buffer = new byte[(int) (1.2 * serializedSize)];
+                        reader.readFully(buffer, 0, serializedSize);
+                        claimedCRC32 = reader.readLong();
                     }
-                    finally
+                    catch (EOFException eof)
                     {
-                        out.close();
+                        break main; // last CL entry didn't get completely written. that's ok.
                     }
-                    String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
-                                              f.getAbsolutePath());
-                    logger.error(st, t);
-                    continue;
-                }
 
-                if (logger.isDebugEnabled())
-                    logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+                    checksum.update(buffer, 0, serializedSize);
+                    if (claimedCRC32 != checksum.getValue())
+                    {
+                        // this entry must not have been fsynced. probably the rest is bad too,
+                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                        continue;
+                    }
 
-                final long entryLocation = reader.getFilePointer();
-                Runnable runnable = new WrappedRunnable()
-                {
-                    public void runMayThrow() throws IOException
+                    /* deserialize the commit log entry */
+                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+                    final RowMutation rm;
+                    try
                     {
-                        if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
-                            return;
-                        if (pointInTimeExceeded(rm))
-                            return;
-
-                        final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
-
-                        // Rebuild the row mutation, omitting column families that
-                        //    a) the user has requested that we ignore,
-                        //    b) have already been flushed,
-                        // or c) are part of a cf that was dropped.
-                        // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                        RowMutation newRm = null;
-                        for (ColumnFamily columnFamily : replayFilter.filter(rm))
+                        // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+                        // the current version. so do make sure the CL is drained prior to upgrading a node.
+                        rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+                        // doublecheck that what we read is [still] valid for the current schema
+                        for (ColumnFamily cf : rm.getColumnFamilies())
+                            for (Column cell : cf)
+                                cf.getComparator().validate(cell.name());
+                    }
+                    catch (UnknownColumnFamilyException ex)
+                    {
+                        if (ex.cfId == null)
+                            continue;
+                        AtomicInteger i = invalidMutations.get(ex.cfId);
+                        if (i == null)
                         {
-                            if (Schema.instance.getCF(columnFamily.id()) == null)
-                                continue; // dropped
+                            i = new AtomicInteger(1);
+                            invalidMutations.put(ex.cfId, i);
+                        }
+                        else
+                            i.incrementAndGet();
+                        continue;
+                    }
+                    catch (Throwable t)
+                    {
+                        File f = File.createTempFile("mutation", "dat");
+                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+                        try
+                        {
+                            out.write(buffer, 0, serializedSize);
+                        }
+                        finally
+                        {
+                            out.close();
+                        }
+                        String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
+                                                  f.getAbsolutePath());
+                        logger.error(st, t);
+                        continue;
+                    }
 
-                            ReplayPosition rp = cfPositions.get(columnFamily.id());
+                    if (logger.isDebugEnabled())
+                        logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
 
-                            // replay if current segment is newer than last flushed one or,
-                            // if it is the last known segment, if we are after the replay position
-                            if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+                    final long entryLocation = reader.getFilePointer();
+                    Runnable runnable = new WrappedRunnable()
+                    {
+                        public void runMayThrow() throws IOException
+                        {
+                            if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+                                return;
+                            if (pointInTimeExceeded(rm))
+                                return;
+
+                            final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+
+                            // Rebuild the row mutation, omitting column families that
+                            //    a) the user has requested that we ignore,
+                            //    b) have already been flushed,
+                            // or c) are part of a cf that was dropped.
+                            // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                            RowMutation newRm = null;
+                            for (ColumnFamily columnFamily : replayFilter.filter(rm))
                             {
-                                if (newRm == null)
-                                    newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
-                                newRm.add(columnFamily);
-                                replayedCount.incrementAndGet();
+                                if (Schema.instance.getCF(columnFamily.id()) == null)
+                                    continue; // dropped
+
+                                ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+                                // replay if current segment is newer than last flushed one or,
+                                // if it is the last known segment, if we are after the replay position
+                                if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+                                {
+                                    if (newRm == null)
+                                        newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
+                                    newRm.add(columnFamily);
+                                    replayedCount.incrementAndGet();
+                                }
+                            }
+                            if (newRm != null)
+                            {
+                                assert !newRm.isEmpty();
+                                Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+                                keyspacesRecovered.add(keyspace);
                             }
                         }
-                        if (newRm != null)
-                        {
-                            assert !newRm.isEmpty();
-                            Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
-                            keyspacesRecovered.add(keyspace);
-                        }
+                    };
+                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                    {
+                        FBUtilities.waitOnFutures(futures);
+                        futures.clear();
                     }
-                };
-                futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
-                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                {
-                    FBUtilities.waitOnFutures(futures);
-                    futures.clear();
                 }
+
+                if (version < CommitLogDescriptor.VERSION_21)
+                    break;
+
+                offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
+                prevEnd = end;
             }
         }
         finally