You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/07/20 15:00:47 UTC

cassandra git commit: Simplify commit log code

Repository: cassandra
Updated Branches:
  refs/heads/trunk 058faff6f -> e8907c16a


Simplify commit log code

patch by Branimir Lambov; reviewed by Ariel Weisberg for CASSANDRA-10202


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8907c16
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8907c16
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8907c16

Branch: refs/heads/trunk
Commit: e8907c16abcd84021a39cdaac79b609fcc64a43c
Parents: 058faff
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue May 3 17:36:25 2016 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jul 20 15:59:38 2016 +0100

----------------------------------------------------------------------
 .../cassandra/config/DatabaseDescriptor.java    |   5 +
 .../AbstractCommitLogSegmentManager.java        | 405 +++++++++----------
 .../db/commitlog/AbstractCommitLogService.java  | 132 +++---
 .../db/commitlog/BatchCommitLogService.java     |   2 +-
 .../cassandra/db/commitlog/CommitLog.java       |  47 +--
 .../db/commitlog/CommitLogArchiver.java         |  25 +-
 .../db/commitlog/CommitLogSegment.java          |  25 +-
 .../commitlog/CommitLogSegmentManagerCDC.java   |  13 +-
 .../CommitLogSegmentManagerStandard.java        |  13 +-
 .../db/commitlog/CompressedSegment.java         |   4 +-
 .../db/commitlog/EncryptedSegment.java          |   4 +-
 .../db/commitlog/FileDirectSegment.java         |   6 +-
 .../db/commitlog/PeriodicCommitLogService.java  |  25 +-
 .../db/commitlog/CommitLogStressTest.java       |  27 +-
 test/unit/org/apache/cassandra/Util.java        |   2 +-
 .../CommitLogSegmentManagerCDCTest.java         |   9 +-
 .../commitlog/CommitLogSegmentManagerTest.java  |  10 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |  10 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 19 files changed, 352 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 374c64d..4f188bb 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1442,6 +1442,11 @@ public class DatabaseDescriptor
         return conf.commitlog_max_compression_buffers_in_pool;
     }
 
+    public static void setCommitLogMaxCompressionBuffersPerPool(int buffers)
+    {
+        conf.commitlog_max_compression_buffers_in_pool = buffers;
+    }
+
     public static int getMaxMutationSize()
     {
         return conf.max_mutation_size_in_kb * 1024;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7ea7439..275d5b3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -22,16 +22,18 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
@@ -45,19 +47,27 @@ public abstract class AbstractCommitLogSegmentManager
 {
     static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 
-    // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
-    private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
+    /**
+     * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+     *
+     * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+     * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+     * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+     * synchronize on 'this'.
+     */
+    private volatile CommitLogSegment availableSegment = null;
 
-    /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
-    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
+    private final WaitQueue segmentPrepared = new WaitQueue();
 
-    /** Active segments, containing unflushed data */
+    /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
     private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
 
-    /** The segment we are currently allocating commit log records to */
-    protected volatile CommitLogSegment allocatingFrom = null;
-
-    private final WaitQueue hasAvailableSegments = new WaitQueue();
+    /**
+     * The segment we are currently allocating commit log records to.
+     *
+     * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+     */
+    private volatile CommitLogSegment allocatingFrom = null;
 
     final String storageDirectory;
 
@@ -69,15 +79,9 @@ public abstract class AbstractCommitLogSegmentManager
      */
     private final AtomicLong size = new AtomicLong();
 
-    /**
-     * New segment creation is initially disabled because we'll typically get some "free" segments
-     * recycled after log replay.
-     */
-    volatile boolean createReserveSegments = false;
-
     private Thread managerThread;
-    protected volatile boolean run = true;
     protected final CommitLog commitLog;
+    private volatile boolean shutdown;
 
     private static final SimpleCachedBufferPool bufferPool =
         new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
@@ -95,54 +99,33 @@ public abstract class AbstractCommitLogSegmentManager
         {
             public void runMayThrow() throws Exception
             {
-                while (run)
+                while (!shutdown)
                 {
                     try
                     {
-                        Runnable task = segmentManagementTasks.poll();
-                        if (task == null)
+                        assert availableSegment == null;
+                        logger.debug("No segments in reserve; creating a fresh one");
+                        availableSegment = createSegment();
+                        if (shutdown)
                         {
-                            // if we have no more work to do, check if we should create a new segment
-                            if (!atSegmentLimit() &&
-                                availableSegments.isEmpty() &&
-                                (activeSegments.isEmpty() || createReserveSegments))
-                            {
-                                logger.trace("No segments in reserve; creating a fresh one");
-                                // TODO : some error handling in case we fail to create a new segment
-                                availableSegments.add(createSegment());
-                                hasAvailableSegments.signalAll();
-                            }
-
-                            // flush old Cfs if we're full
-                            long unused = unusedCapacity();
-                            if (unused < 0)
-                            {
-                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
-                                long spaceToReclaim = 0;
-                                for (CommitLogSegment segment : activeSegments)
-                                {
-                                    if (segment == allocatingFrom)
-                                        break;
-                                    segmentsToRecycle.add(segment);
-                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
-                                    if (spaceToReclaim + unused >= 0)
-                                        break;
-                                }
-                                flushDataFrom(segmentsToRecycle, false);
-                            }
-
-                            // Since we're operating on a "null" allocation task, block here for the next task on the
-                            // queue rather than looping, grabbing another null, and repeating the above work.
-                            try
-                            {
-                                task = segmentManagementTasks.take();
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError();
-                            }
+                            // If shutdown() started and finished during segment creation, we are now left with a
+                            // segment that no one will consume. Discard it.
+                            discardAvailableSegment();
+                            return;
                         }
-                        task.run();
+
+                        segmentPrepared.signalAll();
+                        Thread.yield();
+
+                        if (availableSegment == null && !atSegmentBufferLimit())
+                            // Writing threads need another segment now.
+                            continue;
+
+                        // Writing threads are not waiting for new segments, we can spend time on other tasks.
+                        // flush old Cfs if we're full
+                        maybeFlushToReclaim();
+
+                        LockSupport.park();
                     }
                     catch (Throwable t)
                     {
@@ -151,27 +134,51 @@ public abstract class AbstractCommitLogSegmentManager
                             return;
                         // sleep some arbitrary period to avoid spamming CL
                         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+                        // If we offered a segment, wait for it to be taken before reentering the loop.
+                        // There could be a new segment in next not offered, but only on failure to discard it while
+                        // shutting down-- nothing more can or needs to be done in that case.
                     }
-                }
-            }
 
-            private boolean atSegmentLimit()
-            {
-                return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+                    while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
+                        LockSupport.park();
+                }
             }
         };
 
-        run = true;
-
+        shutdown = false;
         managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
         managerThread.start();
+
+        // for simplicity, ensure the first segment is allocated before continuing
+        advanceAllocatingFrom(null);
     }
 
+    private boolean atSegmentBufferLimit()
+    {
+        return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+    }
+
+    private void maybeFlushToReclaim()
+    {
+        long unused = unusedCapacity();
+        if (unused < 0)
+        {
+            long flushingSize = 0;
+            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+            for (CommitLogSegment segment : activeSegments)
+            {
+                if (segment == allocatingFrom)
+                    break;
+                flushingSize += segment.onDiskSize();
+                segmentsToRecycle.add(segment);
+                if (flushingSize + unused >= 0)
+                    break;
+            }
+            flushDataFrom(segmentsToRecycle, false);
+        }
+    }
 
-    /**
-     * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything.
-     */
-    public abstract void shutdown();
 
     /**
      * Allocate a segment within this CLSM. Should either succeed or throw.
@@ -200,102 +207,69 @@ public abstract class AbstractCommitLogSegmentManager
      */
     abstract void discard(CommitLogSegment segment, boolean delete);
 
-
     /**
-     * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator
-     * is working on initial allocation of a CommitLogSegment.
-     */
-    CommitLogSegment allocatingFrom()
-    {
-        CommitLogSegment r = allocatingFrom;
-        if (r == null)
-        {
-            advanceAllocatingFrom(null);
-            r = allocatingFrom;
-        }
-        return r;
-    }
-
-    /**
-     * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it.
-     * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled.
+     * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
      *
      * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
      */
-    protected void advanceAllocatingFrom(CommitLogSegment old)
+    @DontInline
+    void advanceAllocatingFrom(CommitLogSegment old)
     {
-        while (true)
-        {
-            CommitLogSegment next;
+        while (true) {
             synchronized (this)
             {
-                // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
-                // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
+                // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
                 if (allocatingFrom != old)
                     return;
-                next = availableSegments.poll();
-                if (next != null)
-                {
-                    allocatingFrom = next;
-                    activeSegments.add(next);
-                }
-            }
 
-            if (next != null)
-            {
-                if (old != null)
+                // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+                if (availableSegment != null)
                 {
-                    // Now we can run the user defined command just after switching to the new commit log.
-                    // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                    commitLog.archiver.maybeArchive(old);
-
-                    // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
-                    old.discardUnusedTail();
+                    // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+                    // the critical section.
+                    activeSegments.add(allocatingFrom = availableSegment);
+                    availableSegment = null;
+                    break;
                 }
-
-                // request that the CL be synced out-of-band, as we've finished a segment
-                commitLog.requestExtraSync();
-                return;
             }
 
-            // no more segments, so register to receive a signal when not empty
-            WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+            awaitAvailableSegment(old);
+        }
 
-            // trigger the management thread; this must occur after registering
-            // the signal to ensure we are woken by any new segment creation
-            wakeManager();
+        // Signal the management thread to prepare a new segment.
+        wakeManager();
 
-            // check if the queue has already been added to before waiting on the signal, to catch modifications
-            // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
-            if (!availableSegments.isEmpty() || allocatingFrom != old)
-            {
-                signal.cancel();
-                // if we've been beaten, just stop immediately
-                if (allocatingFrom != old)
-                    return;
-                // otherwise try again, as there should be an available segment
-                continue;
-            }
+        if (old != null)
+        {
+            // Now we can run the user defined command just after switching to the new commit log.
+            // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+            commitLog.archiver.maybeArchive(old);
 
-            // can only reach here if the queue hasn't been inserted into
-            // before we registered the signal, as we only remove items from the queue
-            // after updating allocatingFrom. Can safely block until we are signalled
-            // by the allocator that new segments have been published
-            signal.awaitUninterruptibly();
+            // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+            old.discardUnusedTail();
         }
+
+        // request that the CL be synced out-of-band, as we've finished a segment
+        commitLog.requestExtraSync();
     }
 
-    protected void wakeManager()
+    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
     {
-        // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
-        segmentManagementTasks.add(Runnables.doNothing());
+        do
+        {
+            WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+            if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+                prepared.awaitUninterruptibly();
+            else
+                prepared.cancel();
+        }
+        while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
     }
 
     /**
      * Switch to a new segment, regardless of how much is left in the current one.
      *
-     * Flushes any dirty CFs for this segment and any older segments, and then recycles
-     * the segments
+     * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
      */
     void forceRecycleAll(Iterable<UUID> droppedCfs)
     {
@@ -307,7 +281,7 @@ public abstract class AbstractCommitLogSegmentManager
         last.waitForModifications();
 
         // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
-        // on the relevant keyspaces to complete
+        // to complete
         Keyspace.writeOrder.awaitNewBarrier();
 
         // flush and wait for all CFs that are dirty in segments up-to and including 'last'
@@ -326,7 +300,7 @@ public abstract class AbstractCommitLogSegmentManager
             for (CommitLogSegment segment : activeSegments)
             {
                 if (segment.isUnused())
-                    recycleSegment(segment);
+                    archiveAndDiscard(segment);
             }
 
             CommitLogSegment first;
@@ -341,33 +315,18 @@ public abstract class AbstractCommitLogSegmentManager
     }
 
     /**
-     * Indicates that a segment is no longer in use and that it should be recycled.
+     * Indicates that a segment is no longer in use and that it should be discarded.
      *
      * @param segment segment that is no longer in use
      */
-    void recycleSegment(final CommitLogSegment segment)
+    void archiveAndDiscard(final CommitLogSegment segment)
     {
         boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
-        if (activeSegments.remove(segment))
-        {
-            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
-            discardSegment(segment, archiveSuccess);
-        }
-        else
-        {
-            logger.warn("segment {} not found in activeSegments queue", segment);
-        }
-    }
-
-    /**
-     * Indicates that a segment file should be deleted.
-     *
-     * @param segment segment to be discarded
-     */
-    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
-    {
-        logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
-        segmentManagementTasks.add(() -> discard(segment, deleteFile));
+        if (!activeSegments.remove(segment))
+            return; // already discarded
+        // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+        logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+        discard(segment, archiveSuccess);
     }
 
     /**
@@ -396,28 +355,6 @@ public abstract class AbstractCommitLogSegmentManager
     }
 
     /**
-     * @param name the filename to check
-     * @return true if file is managed by this manager.
-     */
-    public boolean manages(String name)
-    {
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            if (segment.getName().equals(name))
-                return true;
-        return false;
-    }
-
-    /**
-     * Throws a flag that enables the behavior of keeping at least one spare segment
-     * available at all times.
-     */
-    void enableReserveSegmentCreation()
-    {
-        createReserveSegments = true;
-        wakeManager();
-    }
-
-    /**
      * Force a flush on all CFs that are still dirty in @param segments.
      *
      * @return a Future that will finish when all the flushes are complete.
@@ -463,10 +400,7 @@ public abstract class AbstractCommitLogSegmentManager
      */
     public void stopUnsafe(boolean deleteSegments)
     {
-        logger.trace("CLSM closing and clearing existing commit log segments...");
-        createReserveSegments = false;
-
-        awaitManagementTasksCompletion();
+        logger.debug("CLSM closing and clearing existing commit log segments...");
 
         shutdown();
         try
@@ -478,35 +412,24 @@ public abstract class AbstractCommitLogSegmentManager
             throw new RuntimeException(e);
         }
 
-        synchronized (this)
-        {
-            for (CommitLogSegment segment : activeSegments)
-                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-            activeSegments.clear();
-
-            for (CommitLogSegment segment : availableSegments)
-                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-            availableSegments.clear();
-        }
-
-        allocatingFrom = null;
-
-        segmentManagementTasks.clear();
+        for (CommitLogSegment segment : activeSegments)
+            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+        activeSegments.clear();
 
         size.set(0L);
 
         logger.trace("CLSM done with closing and clearing existing commit log segments.");
     }
 
-    // Used by tests only.
+    /**
+     * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+     */
     void awaitManagementTasksCompletion()
     {
-        while (!segmentManagementTasks.isEmpty())
-            Thread.yield();
-        // The last management task is not yet complete. Wait a while for it.
-        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-        // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
-        // waiting completes correctly.
+        if (availableSegment == null && !atSegmentBufferLimit())
+        {
+            awaitAvailableSegment(allocatingFrom);
+        }
     }
 
     /**
@@ -525,18 +448,41 @@ public abstract class AbstractCommitLogSegmentManager
     }
 
     /**
+     * Initiates the shutdown process for the management thread.
+     */
+    public void shutdown()
+    {
+        assert !shutdown;
+        shutdown = true;
+
+        // Release the management thread and delete prepared segment.
+        // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+        discardAvailableSegment();
+        wakeManager();
+    }
+
+    private void discardAvailableSegment()
+    {
+        CommitLogSegment next = null;
+        synchronized (this) {
+            next = availableSegment;
+            availableSegment = null;
+        }
+        if (next != null)
+            next.discard(true);
+    }
+
+    /**
      * Returns when the management thread terminates.
      */
     public void awaitTermination() throws InterruptedException
     {
         managerThread.join();
+        managerThread = null;
 
         for (CommitLogSegment segment : activeSegments)
             segment.close();
 
-        for (CommitLogSegment segment : availableSegments)
-            segment.close();
-
         bufferPool.shutdown();
     }
 
@@ -554,18 +500,19 @@ public abstract class AbstractCommitLogSegmentManager
      */
     CommitLogPosition getCurrentPosition()
     {
-        return allocatingFrom().getCurrentCommitLogPosition();
+        return allocatingFrom.getCurrentCommitLogPosition();
     }
 
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments) throws IOException
+    public void sync() throws IOException
     {
-        CommitLogSegment current = allocatingFrom();
+        CommitLogSegment current = allocatingFrom;
         for (CommitLogSegment segment : getActiveSegments())
         {
-            if (!syncAllSegments && segment.id > current.id)
+            // Do not sync segments that became active after sync started.
+            if (segment.id > current.id)
                 return;
             segment.sync();
         }
@@ -578,5 +525,25 @@ public abstract class AbstractCommitLogSegmentManager
     {
         return bufferPool;
     }
+
+    void wakeManager()
+    {
+        LockSupport.unpark(managerThread);
+    }
+
+    /**
+     * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+     * a buffer to become available.
+     */
+    void notifyBufferFreed()
+    {
+        wakeManager();
+    }
+
+    /** Read-only access to current segment for subclasses. */
+    CommitLogSegment allocatingFrom()
+    {
+        return allocatingFrom;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 0ba4f55..7b56da3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,15 +17,18 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.slf4j.*;
-
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import com.codahale.metrics.Timer.Context;
+
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 public abstract class AbstractCommitLogService
 {
@@ -41,11 +44,10 @@ public abstract class AbstractCommitLogService
 
     // signal that writers can wait on to be notified of a completed sync
     protected final WaitQueue syncComplete = new WaitQueue();
-    protected final Semaphore haveWork = new Semaphore(1);
 
     final CommitLog commitLog;
     private final String name;
-    private final long pollIntervalMillis;
+    private final long pollIntervalNanos;
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
 
@@ -59,14 +61,15 @@ public abstract class AbstractCommitLogService
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.pollIntervalMillis = pollIntervalMillis;
+        this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
-        if (pollIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+        if (pollIntervalNanos < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+                                                             pollIntervalNanos * 1e-6));
 
         Runnable runnable = new Runnable()
         {
@@ -78,26 +81,25 @@ public abstract class AbstractCommitLogService
                 int lagCount = 0;
                 int syncCount = 0;
 
-                boolean run = true;
-                while (run)
+                while (true)
                 {
+                    // always run once after shutdown signalled
+                    boolean shutdownRequested = shutdown;
+
                     try
                     {
-                        // always run once after shutdown signalled
-                        run = !shutdown;
-
                         // sync and signal
-                        long syncStarted = System.currentTimeMillis();
+                        long syncStarted = System.nanoTime();
                         // This is a target for Byteman in CommitLogSegmentManagerTest
-                        commitLog.sync(shutdown);
+                        commitLog.sync();
                         lastSyncedAt = syncStarted;
                         syncComplete.signalAll();
 
 
                         // sleep any time we have left before the next one is due
-                        long now = System.currentTimeMillis();
-                        long sleep = syncStarted + pollIntervalMillis - now;
-                        if (sleep < 0)
+                        long now = System.nanoTime();
+                        long wakeUpAt = syncStarted + pollIntervalNanos;
+                        if (wakeUpAt < now)
                         {
                             // if we have lagged noticeably, update our lag counter
                             if (firstLagAt == 0)
@@ -105,7 +107,7 @@ public abstract class AbstractCommitLogService
                                 firstLagAt = now;
                                 totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
                             }
-                            syncExceededIntervalBy -= sleep;
+                            syncExceededIntervalBy += now - wakeUpAt;
                             lagCount++;
                         }
                         syncCount++;
@@ -114,30 +116,25 @@ public abstract class AbstractCommitLogService
                         if (firstLagAt > 0)
                         {
                             //Only reset the lag tracking if it actually logged this time
-                            boolean logged = NoSpamLogger.log(
-                                    logger,
-                                    NoSpamLogger.Level.WARN,
-                                    5,
-                                    TimeUnit.MINUTES,
-                                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                                                      syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+                            boolean logged = NoSpamLogger.log(logger,
+                                                              NoSpamLogger.Level.WARN,
+                                                              5,
+                                                              TimeUnit.MINUTES,
+                                                              "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+                                                              syncCount,
+                                                              String.format("%.2f", (now - firstLagAt) * 1e-9d),
+                                                              String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
+                                                              lagCount,
+                                                              String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
                            if (logged)
                                firstLagAt = 0;
                         }
 
-                        // if we have lagged this round, we probably have work to do already so we don't sleep
-                        if (sleep < 0 || !run)
-                            continue;
+                        if (shutdownRequested)
+                            return;
 
-                        try
-                        {
-                            haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
-                            haveWork.drainPermits();
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError();
-                        }
+                        if (wakeUpAt > now)
+                            LockSupport.parkNanos(wakeUpAt - now);
                     }
                     catch (Throwable t)
                     {
@@ -145,19 +142,13 @@ public abstract class AbstractCommitLogService
                             break;
 
                         // sleep for full poll-interval after an error, so we don't spam the log file
-                        try
-                        {
-                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError();
-                        }
+                        LockSupport.parkNanos(pollIntervalNanos);
                     }
                 }
             }
         };
 
+        shutdown = false;
         thread = new Thread(runnable, name);
         thread.start();
     }
@@ -174,42 +165,43 @@ public abstract class AbstractCommitLogService
     protected abstract void maybeWaitForSync(Allocation alloc);
 
     /**
-     * Sync immediately, but don't block for the sync to cmplete
+     * Request an additional sync cycle without blocking.
      */
-    public WaitQueue.Signal requestExtraSync()
+    public void requestExtraSync()
     {
-        WaitQueue.Signal signal = syncComplete.register();
-        haveWork.release(1);
-        return signal;
+        LockSupport.unpark(thread);
     }
 
     public void shutdown()
     {
         shutdown = true;
-        haveWork.release(1);
+        requestExtraSync();
     }
 
     /**
-     * FOR TESTING ONLY
+     * Request sync and wait until the current state is synced.
+     *
+     * Note: If a sync is in progress at the time of this request, the call will return after both it and a cycle
+     * initiated immediately afterwards complete.
      */
-    public void restartUnsafe()
+    public void syncBlocking()
     {
-        while (haveWork.availablePermits() < 1)
-            haveWork.release();
+        long requestTime = System.nanoTime();
+        requestExtraSync();
+        awaitSyncAt(requestTime, null);
+    }
 
-        while (haveWork.availablePermits() > 1)
+    void awaitSyncAt(long syncTime, Context context)
+    {
+        do
         {
-            try
-            {
-                haveWork.acquire();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
+            WaitQueue.Signal signal = context != null ? syncComplete.register(context) : syncComplete.register();
+            if (lastSyncedAt < syncTime)
+                signal.awaitUninterruptibly();
+            else
+                signal.cancel();
         }
-        shutdown = false;
-        start();
+        while (lastSyncedAt < syncTime);
     }
 
     public void awaitTermination() throws InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
index ceb5d64..4edfa34 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -30,7 +30,7 @@ class BatchCommitLogService extends AbstractCommitLogService
     {
         // wait until record has been safely persisted to disk
         pending.incrementAndGet();
-        haveWork.release();
+        requestExtraSync();
         alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);
         pending.decrementAndGet();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b66221c..d76b9cb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -116,8 +116,8 @@ public class CommitLog implements CommitLogMBean
 
     CommitLog start()
     {
-        executor.start();
         segmentManager.start();
+        executor.start();
         return this;
     }
 
@@ -129,22 +129,11 @@ public class CommitLog implements CommitLogMBean
      */
     public int recoverSegmentsOnDisk() throws IOException
     {
-        // If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place.
-        if (segmentManager.createReserveSegments)
-            return 0;
-
-        FilenameFilter unmanagedFilesFilter = new FilenameFilter()
-        {
-            public boolean accept(File dir, String name)
-            {
-                // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
-                // until after recover was finished.  this turns out to be fragile; it is less error-prone to go
-                // ahead and allow writes before recover, and just skip active segments when we do.
-                return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
-            }
-        };
+        FilenameFilter unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
 
         // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904
+        // The files may have already been archived by normal CommitLog operation. This may cause errors in this
+        // archiving pass, which we should not treat as serious. 
         for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter))
         {
             archiver.maybeArchive(file.getPath(), file.getName());
@@ -154,6 +143,7 @@ public class CommitLog implements CommitLogMBean
         assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
         archiver.maybeRestoreArchive();
 
+        // List the files again as archiver may have added segments.
         File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
         int replayed = 0;
         if (files.length == 0)
@@ -171,7 +161,6 @@ public class CommitLog implements CommitLogMBean
                 segmentManager.handleReplayedSegment(f);
         }
 
-        segmentManager.enableReserveSegmentCreation();
         return replayed;
     }
 
@@ -231,9 +220,9 @@ public class CommitLog implements CommitLogMBean
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments) throws IOException
+    public void sync() throws IOException
     {
-        segmentManager.sync(syncAllSegments);
+        segmentManager.sync();
     }
 
     /**
@@ -315,8 +304,8 @@ public class CommitLog implements CommitLogMBean
 
             if (segment.isUnused())
             {
-                logger.trace("Commit log segment {} is unused", segment);
-                segmentManager.recycleSegment(segment);
+                logger.debug("Commit log segment {} is unused", segment);
+                segmentManager.archiveAndDiscard(segment);
             }
             else
             {
@@ -455,23 +444,7 @@ public class CommitLog implements CommitLogMBean
      */
     public int restartUnsafe() throws IOException
     {
-        segmentManager.start();
-        executor.restartUnsafe();
-        try
-        {
-            return recoverSegmentsOnDisk();
-        }
-        catch (FSWriteError e)
-        {
-            // Workaround for a class of races that keeps showing up on Windows tests.
-            // stop/start/reset path on Windows with segment deletion is very touchy/brittle
-            // and the timing keeps getting screwed up. Rather than chasing our tail further
-            // or rewriting the CLSM, just report that we didn't recover anything back up
-            // the chain. This will silence most intermittent test failures on Windows
-            // and appropriately fail tests that expected segments to be recovered that
-            // were not.
-            return 0;
-        }
+        return start().recoverSegmentsOnDisk();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 044f2db..1abdd79 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
 
 public class CommitLogArchiver
 {
@@ -151,22 +152,32 @@ public class CommitLogArchiver
 
     /**
      * Differs from the above because it can be used on any file, rather than only
-     * managed commit log segments (and thus cannot call waitForFinalSync).
+     * managed commit log segments (and thus cannot call waitForFinalSync), and in
+     * the treatment of failures.
      *
-     * Used to archive files present in the commit log directory at startup (CASSANDRA-6904)
+     * Used to archive files present in the commit log directory at startup (CASSANDRA-6904).
+     * Since the files being already archived by normal operation could cause subsequent
+     * hard-linking or other operations to fail, we should not throw errors on failure
      */
     public void maybeArchive(final String path, final String name)
     {
         if (Strings.isNullOrEmpty(archiveCommand))
             return;
 
-        archivePending.put(name, executor.submit(new WrappedRunnable()
+        archivePending.put(name, executor.submit(new Runnable()
         {
-            protected void runMayThrow() throws IOException
+            public void run()
             {
-                String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name));
-                command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path));
-                exec(command);
+                try
+                {
+                    String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name));
+                    command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path));
+                    exec(command);
+                }
+                catch (IOException e)
+                {
+                    logger.warn("Archiving file {} failed, file may have already been archived.", name, e);
+                }
             }
         }));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a1158be..eb9759e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.config.*;
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -51,8 +50,6 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
  */
 public abstract class CommitLogSegment
 {
-    private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
-
     private final static long idBase;
 
     private CDCState cdcState = CDCState.PERMITTED;
@@ -117,14 +114,13 @@ public abstract class CommitLogSegment
     ByteBuffer buffer;
     private volatile boolean headerWritten;
 
-    final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
-    static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+    static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
         Configuration config = commitLog.configuration;
-        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose)
-                                                          : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose)
+        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager)
+                                                          : config.useCompression() ? new CompressedSegment(commitLog, manager)
                                                                                     : new MemoryMappedSegment(commitLog, manager);
         segment.writeLogHeader();
         return segment;
@@ -152,7 +148,6 @@ public abstract class CommitLogSegment
      */
     CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
-        this.commitLog = commitLog;
         this.manager = manager;
 
         id = getNextId();
@@ -370,6 +365,18 @@ public abstract class CommitLogSegment
     }
 
     /**
+     * Discards a segment file when the log no longer requires it. The file may be left on disk if the archive script
+     * requires it. (Potentially blocking operation)
+     */
+    void discard(boolean deleteFile)
+    {
+        close();
+        if (deleteFile)
+            FileUtils.deleteWithConfirm(logFile);
+        manager.addSize(-onDiskSize());
+    }
+
+    /**
      * @return the current CommitLogPosition for this log segment
      */
     public CommitLogPosition getCurrentCommitLogPosition()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index 306cec8..a91384f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -53,8 +53,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
     @Override
     void start()
     {
-        super.start();
         cdcSizeTracker.start();
+        super.start();
     }
 
     public void discard(CommitLogSegment segment, boolean delete)
@@ -78,9 +78,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
      */
     public void shutdown()
     {
-        run = false;
         cdcSizeTracker.shutdown();
-        wakeManager();
+        super.shutdown();
     }
 
     /**
@@ -103,7 +102,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
         {
             // Failed to allocate, so move to a new segment with enough room if possible.
             advanceAllocatingFrom(segment);
-            segment = allocatingFrom;
+            segment = allocatingFrom();
 
             throwIfForbidden(mutation, segment);
         }
@@ -146,7 +145,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
      */
     public CommitLogSegment createSegment()
     {
-        CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+        CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this);
         cdcSizeTracker.processNewSegment(segment);
         return segment;
     }
@@ -179,6 +178,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
          */
         public void start()
         {
+            size = 0;
+            unflushedCDCSize = 0;
             cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
         }
 
@@ -245,7 +246,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
         {
             rateLimiter.acquire();
             calculateSize();
-            CommitLogSegment allocatingFrom = segmentManager.allocatingFrom;
+            CommitLogSegment allocatingFrom = segmentManager.allocatingFrom();
             if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN)
                 processNewSegment(allocatingFrom);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
index 333077c..86e886b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
@@ -39,15 +39,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
     }
 
     /**
-     * Initiates the shutdown process for the management thread.
-     */
-    public void shutdown()
-    {
-        run = false;
-        wakeManager();
-    }
-
-    /**
      * Reserve space in the current segment for the provided mutation or, if there isn't space available,
      * create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom
      *
@@ -64,7 +55,7 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
         {
             // failed to allocate, so move to a new segment with enough room
             advanceAllocatingFrom(segment);
-            segment = allocatingFrom;
+            segment = allocatingFrom();
         }
 
         return alloc;
@@ -84,6 +75,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
 
     public CommitLogSegment createSegment()
     {
-        return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+        return CommitLogSegment.createSegment(commitLog, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 2e46028..288b766 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -42,9 +42,9 @@ public class CompressedSegment extends FileDirectSegment
     /**
      * Constructs a new segment file.
      */
-    CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+    CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
-        super(commitLog, manager, onClose);
+        super(commitLog, manager);
         this.compressor = commitLog.configuration.getCompressor();
         manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 103351e..4ca1ede 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
     private final EncryptionContext encryptionContext;
     private final Cipher cipher;
 
-    public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+    public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
-        super(commitLog, manager, onClose);
+        super(commitLog, manager);
         this.encryptionContext = commitLog.configuration.getEncryptionContext();
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index d4160e4..55084be 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -29,12 +29,10 @@ import org.apache.cassandra.io.FSWriteError;
 public abstract class FileDirectSegment extends CommitLogSegment
 {
     volatile long lastWrittenPos = 0;
-    private final Runnable onClose;
 
-    FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+    FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
     {
         super(commitLog, manager);
-        this.onClose = onClose;
     }
 
     @Override
@@ -62,7 +60,7 @@ public abstract class FileDirectSegment extends CommitLogSegment
         }
         finally
         {
-            onClose.run();
+            manager.notifyBufferFreed();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..bde832b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -18,11 +18,10 @@
 package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 class PeriodicCommitLogService extends AbstractCommitLogService
 {
-    private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
+    private static final long blockWhenSyncLagsNanos = (long) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5e6);
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
@@ -31,28 +30,12 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
     {
-        if (waitForSyncToCatchUp(Long.MAX_VALUE))
+        long expectedSyncTime = System.nanoTime() - blockWhenSyncLagsNanos;
+        if (lastSyncedAt < expectedSyncTime)
         {
-            // wait until periodic sync() catches up with its schedule
-            long started = System.currentTimeMillis();
             pending.incrementAndGet();
-            while (waitForSyncToCatchUp(started))
-            {
-                WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time());
-                if (waitForSyncToCatchUp(started))
-                    signal.awaitUninterruptibly();
-                else
-                    signal.cancel();
-            }
+            awaitSyncAt(expectedSyncTime, commitLog.metrics.waitingOnCommit.time());
             pending.decrementAndGet();
         }
     }
-
-    /**
-     * @return true if sync is currently lagging behind inserts
-     */
-    private boolean waitForSyncToCatchUp(long started)
-    {
-        return started > lastSyncedAt + blockWhenSyncLagsMillis;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 06c252f..b86a15b 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -29,9 +29,15 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.RateLimiter;
-import org.junit.*;
 
-import org.apache.cassandra.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.Mutation;
@@ -97,6 +103,7 @@ public class CommitLogStressTest
             initialize();
 
             CommitLogStressTest tester = new CommitLogStressTest();
+            tester.cleanDir();
             tester.testFixedSize();
         }
         catch (Throwable e)
@@ -130,12 +137,13 @@ public class CommitLogStressTest
 
         SchemaLoader.loadSchema();
         SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
+
+        CommitLog.instance.stopUnsafe(true);
     }
 
     @Before
     public void cleanDir() throws IOException
     {
-        CommitLog.instance.stopUnsafe(true);
         File dir = new File(location);
         if (dir.isDirectory())
         {
@@ -209,8 +217,6 @@ public class CommitLogStressTest
             {
                 DatabaseDescriptor.setCommitLogSync(sync);
                 CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
-                // Need to enable reserve segment creation as close to test start as possible to minimize race
-                commitLog.segmentManager.enableReserveSegmentCreation();
                 testLog(commitLog);
                 assert !failed;
             }
@@ -307,17 +313,12 @@ public class CommitLogStressTest
     private void verifySizes(CommitLog commitLog)
     {
         // Complete anything that's still left to write.
-        commitLog.executor.requestExtraSync().awaitUninterruptibly();
-        // One await() does not suffice as we may be signalled when an ongoing sync finished. Request another
-        // (which shouldn't write anything) to make sure the first we triggered completes.
-        // FIXME: The executor should give us a chance to await completion of the sync we requested.
-        commitLog.executor.requestExtraSync().awaitUninterruptibly();
-
-        // Wait for any pending deletes or segment allocations to complete.
+        commitLog.executor.syncBlocking();
+        // Wait for any concurrent segment allocations to complete.
         commitLog.segmentManager.awaitManagementTasksCompletion();
 
         long combinedSize = 0;
-        for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+        for (File f : new File(commitLog.segmentManager.storageDirectory).listFiles())
             combinedSize += f.length();
         Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index df83ff1..e5c1831 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -530,7 +530,7 @@ public class Util
     public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
     {
         long now = System.currentTimeMillis();
-        while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
+        while (System.currentTimeMillis() < now + (1000 * timeoutInSeconds))
         {
             if (s.get().equals(expected))
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index e308a2f..68ce57d 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -51,8 +51,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
     @Before
     public void before() throws IOException
     {
-        // disable reserve segment to get more deterministic allocation/testing of CDC boundary states
-        CommitLog.instance.forceRecycleAllSegments();
+        CommitLog.instance.resetUnsafe(true);
         for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
             FileUtils.deleteWithConfirm(f);
     }
@@ -120,7 +119,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
         for (int i = 0; i < 8; i++)
         {
             new RowUpdateBuilder(currentTableMetadata(), 0, i)
-                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) // fit 3 in a segment
                 .build().apply();
         }
 
@@ -136,7 +135,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
         for (int i = 0; i < 8; i++)
         {
             new RowUpdateBuilder(currentTableMetadata(), 0, i)
-                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
                 .build().apply();
         }
         // 4 total again, 3 CONTAINS, 1 in waiting PERMITTED
@@ -215,6 +214,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
     private void expectCurrentCDCState(CDCState state)
     {
         Assert.assertEquals("Received unexpected CDCState on current allocatingFrom segment.",
-            state, CommitLog.instance.segmentManager.allocatingFrom.getCDCState());
+            state, CommitLog.instance.segmentManager.allocatingFrom().getCDCState());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
index b777389..397a8eb 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
 import javax.naming.ConfigurationException;
@@ -51,6 +52,7 @@ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 public class CommitLogSegmentManagerTest
 {
     //Block commit log service from syncing
+    @SuppressWarnings("unused")
     private static final Semaphore allowSync = new Semaphore(0);
 
     private static final String KEYSPACE1 = "CommitLogTest";
@@ -66,6 +68,7 @@ public class CommitLogSegmentManagerTest
         DatabaseDescriptor.setCommitLogSegmentSize(1);
         DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
         DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+        DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3);
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
@@ -109,11 +112,14 @@ public class CommitLogSegmentManagerTest
         }
         Thread.sleep(1000);
 
-        // Should only be able to create 3 segments (not 7) because it blocks waiting for truncation that never comes.
+        // Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
         Assert.assertEquals(3, clsm.getActiveSegments().size());
 
-        clsm.getActiveSegments().forEach(segment -> clsm.recycleSegment(segment));
+        // Discard the currently active segments so allocation can continue.
+        // Take snapshot of the list, otherwise this will also discard newly allocated segments.
+        new ArrayList<>(clsm.getActiveSegments()).forEach( clsm::archiveAndDiscard );
 
+        // The allocated count should reach the limit again.
         Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 79051e0..4bc5f6b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -277,7 +277,7 @@ public class CommitLogTest
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync();
         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
 
         assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
@@ -594,7 +594,7 @@ public class CommitLogTest
         cellCount += 1;
         CommitLog.instance.add(rm2);
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync();
 
         SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
@@ -631,7 +631,7 @@ public class CommitLogTest
             }
         }
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync();
 
         SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
         List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
@@ -661,6 +661,10 @@ public class CommitLogTest
         @Override
         public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
         {
+            // Filter out system writes that could flake the test.
+            if (!KEYSPACE1.equals(m.getKeyspaceName()))
+                return;
+
             if (entryLocation <= filterPosition.position)
             {
                 // Skip over this mutation.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 9a22b04..7f43378 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -40,7 +40,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
     {
         super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync();
 
         this.processor = processor;
         commitLogReader = new CommitLogTestReader();