You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/05 13:12:02 UTC

[1/6] cassandra git commit: More frequent commitlog chained markers

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 eb05025c0 -> 05cb556f9
  refs/heads/cassandra-3.11 d577918ba -> c3a1a4fa8
  refs/heads/trunk d274c6ac0 -> 2402acd47


More frequent commitlog chained markers

patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f

Branch: refs/heads/cassandra-3.0
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  10 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../db/commitlog/AbstractCommitLogService.java  |  86 ++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogSegment.java          |  85 ++++++++----
 .../db/commitlog/CompressedSegment.java         |  12 ++
 .../db/commitlog/MemoryMappedSegment.java       |   7 +-
 .../db/commitlog/PeriodicCommitLogService.java  |   2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 128 +++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |   4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   6 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
  * Fix serialized size of DataLimits (CASSANDRA-14057)
  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds. 
+# milliseconds.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
+    public Integer commitlog_marker_period_in_ms = 0;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
         conf.commitlog_sync_period_in_ms = periodMillis;
     }
 
+    public static void setCommitLogMarkerPeriod(int markerPeriod)
+    {
+        conf.commitlog_marker_period_in_ms = markerPeriod;
+    }
+
+    public static int getCommitLogMarkerPeriod()
+    {
+        return conf.commitlog_marker_period_in_ms;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
 public abstract class AbstractCommitLogService
 {
 
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
 
     final CommitLog commitLog;
     private final String name;
-    private final long pollIntervalMillis;
+
+    /**
+     * The duration between syncs to disk.
+     */
+    private final long syncIntervalMillis;
+
+    /**
+     * The duration between updating the chained markers in the the commit log file. This value should be
+     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+     */
+    private final long markerIntervalMillis;
+
+    /**
+     * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+     * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+     * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+     */
+    private volatile boolean syncRequested;
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
 
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+    {
+        this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+    }
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.pollIntervalMillis = pollIntervalMillis;
+        this.syncIntervalMillis = syncIntervalMillis;
+
+        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+        // faster than the sync interval
+        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+            markerIntervalMillis = syncIntervalMillis;
+
+        // apply basic bounds checking on the marker interval
+        if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+        {
+            logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                        markerIntervalMillis, syncIntervalMillis);
+            markerIntervalMillis = syncIntervalMillis;
+        }
+
+        this.markerIntervalMillis = markerIntervalMillis;
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
-        if (pollIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+        if (syncIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+                                                             syncIntervalMillis * 1e-6));
 
         Runnable runnable = new Runnable()
         {
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
                         run = !shutdown;
 
                         // sync and signal
-                        long syncStarted = System.currentTimeMillis();
-                        //This is a target for Byteman in CommitLogSegmentManagerTest
-                        commitLog.sync(shutdown);
-                        lastSyncedAt = syncStarted;
-                        syncComplete.signalAll();
-
+                        long pollStarted = System.currentTimeMillis();
+                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                        {
+                            // in this branch, we want to flush the commit log to disk
+                            commitLog.sync(shutdown, true);
+                            syncRequested = false;
+                            lastSyncedAt = pollStarted;
+                            syncComplete.signalAll();
+                        }
+                        else
+                        {
+                            // in this branch, just update the commit log sync headers
+                            commitLog.sync(false, false);
+                        }
 
                         // sleep any time we have left before the next one is due
                         long now = System.currentTimeMillis();
-                        long sleep = syncStarted + pollIntervalMillis - now;
+                        long sleep = pollStarted + markerIntervalMillis - now;
                         if (sleep < 0)
                         {
                             // if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
                             lagCount++;
                         }
                         syncCount++;
-                        totalSyncDuration += now - syncStarted;
+                        totalSyncDuration += now - pollStarted;
 
                         if (firstLagAt > 0)
                         {
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {
-                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
                         }
                         catch (InterruptedException e)
                         {
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
      */
     public WaitQueue.Signal requestExtraSync()
     {
+        syncRequested = true;
         WaitQueue.Signal signal = syncComplete.register();
         haveWork.release(1);
         return signal;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments)
+    public void sync(boolean syncAllSegments, boolean flush)
     {
         CommitLogSegment current = allocator.allocatingFrom();
         for (CommitLogSegment segment : allocator.getActiveSegments())
         {
             if (!syncAllSegments && segment.id > current.id)
                 return;
-            segment.sync();
+            segment.sync(flush);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
     // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
     private volatile int lastSyncedOffset;
 
+    /**
+     * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+     * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+     */
+    private volatile int lastMarkerOffset;
+
     // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
     // as the segment is being closed.
     // No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
         // write the header
         CommitLogDescriptor.writeHeader(buffer, descriptor);
         endOfBuffer = buffer.capacity();
-        lastSyncedOffset = buffer.position();
+
+        lastSyncedOffset = lastMarkerOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
     }
 
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
     // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
     void discardUnusedTail()
     {
-        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
         // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
         // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
         // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Forces a disk flush for this segment file.
+     * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+     *
+     * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
      */
-    synchronized void sync()
+    synchronized void sync(boolean flush)
     {
-        boolean close = false;
+        assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                    lastMarkerOffset, lastSyncedOffset);
         // check we have more work to do
-        if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+        final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+        final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+        if (!(needToMarkData || hasDataToFlush))
             return;
         // Note: Even if the very first allocation of this sync section failed, we still want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
         // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
-        int startMarker = lastSyncedOffset;
-        // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-        // the point at which we can safely consider records to have been completely written to.
-        int nextMarker = allocate(SYNC_MARKER_SIZE);
-        if (nextMarker < 0)
+        boolean close = false;
+        int startMarker = lastMarkerOffset;
+        int nextMarker, sectionEnd;
+        if (needToMarkData)
         {
-            // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-            discardUnusedTail();
-            close = true;
-
-            // We use the buffer size as the synced position after a close instead of the end of the actual data
-            // to make sure we only close the buffer once.
-            // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-            nextMarker = buffer.capacity();
+            // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+            // the point at which we can safely consider records to have been completely written to.
+            nextMarker = allocate(SYNC_MARKER_SIZE);
+            if (nextMarker < 0)
+            {
+                // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                discardUnusedTail();
+                close = true;
+
+                // We use the buffer size as the synced position after a close instead of the end of the actual data
+                // to make sure we only close the buffer once.
+                // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                nextMarker = buffer.capacity();
+            }
+            // Wait for mutations to complete as well as endOfBuffer to have been written.
+            waitForModifications();
+            sectionEnd = close ? endOfBuffer : nextMarker;
+
+            // Possibly perform compression or encryption and update the chained markers
+            write(startMarker, sectionEnd);
+            lastMarkerOffset = sectionEnd;
+        }
+        else
+        {
+            // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+            // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+            nextMarker = lastMarkerOffset;
+            sectionEnd = nextMarker;
         }
 
-        // Wait for mutations to complete as well as endOfBuffer to have been written.
-        waitForModifications();
-        int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
-        write(startMarker, sectionEnd);
+        if (flush || close)
+        {
+            flush(startMarker, sectionEnd);
+            lastSyncedOffset = lastMarkerOffset = nextMarker;
+        }
 
-        // Signal the sync as complete.
-        lastSyncedOffset = nextMarker;
         if (close)
             internalClose();
         syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
 
     abstract void write(int lastSyncedOffset, int nextMarker);
 
+    abstract void flush(int startMarker, int nextMarker);
+
     public boolean isStillAllocating()
     {
         return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
     synchronized void close()
     {
         discardUnusedTail();
-        sync();
+        sync(true);
         assert buffer == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();
             lastWrittenPos = channel.position();
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force(channel, true);
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
         // write previous sync marker to point to next sync marker
         // we don't chain the crcs here to ensure this method is idempotent if it fails
         writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
 
-        try {
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force((MappedByteBuffer) buffer);
         }
         catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
-        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
     }
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+    @Test
+    @BMRule(name = "force all calls to sync() to not flush to disk",
+    targetClass = "CommitLogSegment",
+    targetMethod = "sync(boolean)",
+    action = "$flush = false")
+    public void replayCommitLogWithoutFlushing() throws IOException
+    {
+        DatabaseDescriptor.setCommitLogSegmentSize(5);
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+        DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        byte[] entropy = new byte[1024];
+        new Random().nextBytes(entropy);
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                           .clustering("bytes")
+                           .add("val", ByteBuffer.wrap(entropy))
+                           .build();
+
+        int samples = 10000;
+        for (int i = 0; i < samples; i++)
+            CommitLog.instance.add(m);
+
+        CommitLog.instance.sync(false, true);
+
+        Replayer replayer = new Replayer(cfs1.metadata);
+        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+        replayer.recover(commitLogDir.listFiles());
+        Assert.assertEquals(samples, replayer.count);
+    }
+
+    private static class Replayer extends CommitLogReplayer
+    {
+        private final CFMetaData cfm;
+        private int count;
+
+        Replayer(CFMetaData cfm)
+        {
+            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+            this.cfm = cfm;
+        }
+
+        @Override
+        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+        {
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+            try
+            {
+                Mutation mutation = Mutation.serializer.deserialize(bufIn,
+                                                           desc.getMessagingVersion(),
+                                                           SerializationHelper.Flag.LOCAL);
+
+                if (cfm == null || mutation.get(cfm) != null)
+                    count++;
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                       @BMRule(name = "Release Semaphore after sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
     public void testCompressedCommitLogBackpressure() throws Throwable
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
         }
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
         // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
         for (SSTableReader reader : cfs.getLiveSSTables())
             reader.reloadSSTableMetadata();
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
         // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
 {
     public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
     {
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
 
         CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
         File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: More frequent commitlog chained markers

Posted by ja...@apache.org.
More frequent commitlog chained markers

patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f

Branch: refs/heads/cassandra-3.11
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  10 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../db/commitlog/AbstractCommitLogService.java  |  86 ++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogSegment.java          |  85 ++++++++----
 .../db/commitlog/CompressedSegment.java         |  12 ++
 .../db/commitlog/MemoryMappedSegment.java       |   7 +-
 .../db/commitlog/PeriodicCommitLogService.java  |   2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 128 +++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |   4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   6 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
  * Fix serialized size of DataLimits (CASSANDRA-14057)
  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds. 
+# milliseconds.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
+    public Integer commitlog_marker_period_in_ms = 0;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
         conf.commitlog_sync_period_in_ms = periodMillis;
     }
 
+    public static void setCommitLogMarkerPeriod(int markerPeriod)
+    {
+        conf.commitlog_marker_period_in_ms = markerPeriod;
+    }
+
+    public static int getCommitLogMarkerPeriod()
+    {
+        return conf.commitlog_marker_period_in_ms;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
 public abstract class AbstractCommitLogService
 {
 
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
 
     final CommitLog commitLog;
     private final String name;
-    private final long pollIntervalMillis;
+
+    /**
+     * The duration between syncs to disk.
+     */
+    private final long syncIntervalMillis;
+
+    /**
+     * The duration between updating the chained markers in the the commit log file. This value should be
+     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+     */
+    private final long markerIntervalMillis;
+
+    /**
+     * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+     * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+     * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+     */
+    private volatile boolean syncRequested;
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
 
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+    {
+        this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+    }
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.pollIntervalMillis = pollIntervalMillis;
+        this.syncIntervalMillis = syncIntervalMillis;
+
+        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+        // faster than the sync interval
+        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+            markerIntervalMillis = syncIntervalMillis;
+
+        // apply basic bounds checking on the marker interval
+        if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+        {
+            logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                        markerIntervalMillis, syncIntervalMillis);
+            markerIntervalMillis = syncIntervalMillis;
+        }
+
+        this.markerIntervalMillis = markerIntervalMillis;
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
-        if (pollIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+        if (syncIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+                                                             syncIntervalMillis * 1e-6));
 
         Runnable runnable = new Runnable()
         {
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
                         run = !shutdown;
 
                         // sync and signal
-                        long syncStarted = System.currentTimeMillis();
-                        //This is a target for Byteman in CommitLogSegmentManagerTest
-                        commitLog.sync(shutdown);
-                        lastSyncedAt = syncStarted;
-                        syncComplete.signalAll();
-
+                        long pollStarted = System.currentTimeMillis();
+                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                        {
+                            // in this branch, we want to flush the commit log to disk
+                            commitLog.sync(shutdown, true);
+                            syncRequested = false;
+                            lastSyncedAt = pollStarted;
+                            syncComplete.signalAll();
+                        }
+                        else
+                        {
+                            // in this branch, just update the commit log sync headers
+                            commitLog.sync(false, false);
+                        }
 
                         // sleep any time we have left before the next one is due
                         long now = System.currentTimeMillis();
-                        long sleep = syncStarted + pollIntervalMillis - now;
+                        long sleep = pollStarted + markerIntervalMillis - now;
                         if (sleep < 0)
                         {
                             // if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
                             lagCount++;
                         }
                         syncCount++;
-                        totalSyncDuration += now - syncStarted;
+                        totalSyncDuration += now - pollStarted;
 
                         if (firstLagAt > 0)
                         {
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {
-                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
                         }
                         catch (InterruptedException e)
                         {
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
      */
     public WaitQueue.Signal requestExtraSync()
     {
+        syncRequested = true;
         WaitQueue.Signal signal = syncComplete.register();
         haveWork.release(1);
         return signal;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments)
+    public void sync(boolean syncAllSegments, boolean flush)
     {
         CommitLogSegment current = allocator.allocatingFrom();
         for (CommitLogSegment segment : allocator.getActiveSegments())
         {
             if (!syncAllSegments && segment.id > current.id)
                 return;
-            segment.sync();
+            segment.sync(flush);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
     // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
     private volatile int lastSyncedOffset;
 
+    /**
+     * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+     * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+     */
+    private volatile int lastMarkerOffset;
+
     // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
     // as the segment is being closed.
     // No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
         // write the header
         CommitLogDescriptor.writeHeader(buffer, descriptor);
         endOfBuffer = buffer.capacity();
-        lastSyncedOffset = buffer.position();
+
+        lastSyncedOffset = lastMarkerOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
     }
 
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
     // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
     void discardUnusedTail()
     {
-        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
         // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
         // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
         // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Forces a disk flush for this segment file.
+     * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+     *
+     * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
      */
-    synchronized void sync()
+    synchronized void sync(boolean flush)
     {
-        boolean close = false;
+        assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                    lastMarkerOffset, lastSyncedOffset);
         // check we have more work to do
-        if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+        final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+        final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+        if (!(needToMarkData || hasDataToFlush))
             return;
         // Note: Even if the very first allocation of this sync section failed, we still want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
         // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
-        int startMarker = lastSyncedOffset;
-        // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-        // the point at which we can safely consider records to have been completely written to.
-        int nextMarker = allocate(SYNC_MARKER_SIZE);
-        if (nextMarker < 0)
+        boolean close = false;
+        int startMarker = lastMarkerOffset;
+        int nextMarker, sectionEnd;
+        if (needToMarkData)
         {
-            // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-            discardUnusedTail();
-            close = true;
-
-            // We use the buffer size as the synced position after a close instead of the end of the actual data
-            // to make sure we only close the buffer once.
-            // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-            nextMarker = buffer.capacity();
+            // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+            // the point at which we can safely consider records to have been completely written to.
+            nextMarker = allocate(SYNC_MARKER_SIZE);
+            if (nextMarker < 0)
+            {
+                // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                discardUnusedTail();
+                close = true;
+
+                // We use the buffer size as the synced position after a close instead of the end of the actual data
+                // to make sure we only close the buffer once.
+                // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                nextMarker = buffer.capacity();
+            }
+            // Wait for mutations to complete as well as endOfBuffer to have been written.
+            waitForModifications();
+            sectionEnd = close ? endOfBuffer : nextMarker;
+
+            // Possibly perform compression or encryption and update the chained markers
+            write(startMarker, sectionEnd);
+            lastMarkerOffset = sectionEnd;
+        }
+        else
+        {
+            // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+            // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+            nextMarker = lastMarkerOffset;
+            sectionEnd = nextMarker;
         }
 
-        // Wait for mutations to complete as well as endOfBuffer to have been written.
-        waitForModifications();
-        int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
-        write(startMarker, sectionEnd);
+        if (flush || close)
+        {
+            flush(startMarker, sectionEnd);
+            lastSyncedOffset = lastMarkerOffset = nextMarker;
+        }
 
-        // Signal the sync as complete.
-        lastSyncedOffset = nextMarker;
         if (close)
             internalClose();
         syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
 
     abstract void write(int lastSyncedOffset, int nextMarker);
 
+    abstract void flush(int startMarker, int nextMarker);
+
     public boolean isStillAllocating()
     {
         return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
     synchronized void close()
     {
         discardUnusedTail();
-        sync();
+        sync(true);
         assert buffer == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();
             lastWrittenPos = channel.position();
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force(channel, true);
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
         // write previous sync marker to point to next sync marker
         // we don't chain the crcs here to ensure this method is idempotent if it fails
         writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
 
-        try {
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force((MappedByteBuffer) buffer);
         }
         catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
-        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
     }
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+    @Test
+    @BMRule(name = "force all calls to sync() to not flush to disk",
+    targetClass = "CommitLogSegment",
+    targetMethod = "sync(boolean)",
+    action = "$flush = false")
+    public void replayCommitLogWithoutFlushing() throws IOException
+    {
+        DatabaseDescriptor.setCommitLogSegmentSize(5);
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+        DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        byte[] entropy = new byte[1024];
+        new Random().nextBytes(entropy);
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                           .clustering("bytes")
+                           .add("val", ByteBuffer.wrap(entropy))
+                           .build();
+
+        int samples = 10000;
+        for (int i = 0; i < samples; i++)
+            CommitLog.instance.add(m);
+
+        CommitLog.instance.sync(false, true);
+
+        Replayer replayer = new Replayer(cfs1.metadata);
+        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+        replayer.recover(commitLogDir.listFiles());
+        Assert.assertEquals(samples, replayer.count);
+    }
+
+    private static class Replayer extends CommitLogReplayer
+    {
+        private final CFMetaData cfm;
+        private int count;
+
+        Replayer(CFMetaData cfm)
+        {
+            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+            this.cfm = cfm;
+        }
+
+        @Override
+        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+        {
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+            try
+            {
+                Mutation mutation = Mutation.serializer.deserialize(bufIn,
+                                                           desc.getMessagingVersion(),
+                                                           SerializationHelper.Flag.LOCAL);
+
+                if (cfm == null || mutation.get(cfm) != null)
+                    count++;
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                       @BMRule(name = "Release Semaphore after sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
     public void testCompressedCommitLogBackpressure() throws Throwable
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
         }
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
         // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
         for (SSTableReader reader : cfs.getLiveSSTables())
             reader.reloadSSTableMetadata();
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
         // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
 {
     public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
     {
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
 
         CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
         File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3a1a4fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3a1a4fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3a1a4fa

Branch: refs/heads/cassandra-3.11
Commit: c3a1a4fa8083b2c4c6a5454551979954a0a71339
Parents: d577918 05cb556
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:07:18 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:08:22 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 10 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++
 .../AbstractCommitLogSegmentManager.java        |  8 +-
 .../db/commitlog/AbstractCommitLogService.java  | 84 ++++++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 .../db/commitlog/CommitLogSegment.java          | 85 +++++++++++------
 .../db/commitlog/CompressedSegment.java         |  4 +-
 .../db/commitlog/EncryptedSegment.java          |  3 -
 .../db/commitlog/FileDirectSegment.java         | 14 +++
 .../db/commitlog/MemoryMappedSegment.java       |  4 +
 .../db/commitlog/PeriodicCommitLogService.java  |  2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 98 ++++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |  4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 10 +-
 .../db/commitlog/CommitLogTestReplayer.java     |  2 +-
 17 files changed, 281 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e26aeb8,2683dc2..7c0af91
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
 -3.0.16
 +3.11.2
 + * Remove OpenJDK log warning (CASSANDRA-13916)
 + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
 + * Cache disk boundaries (CASSANDRA-13215)
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
+  * More frequent commitlog chained markers (CASSANDRA-13987)
   * Fix serialized size of DataLimits (CASSANDRA-14057)
   * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
   * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,0796183..5fe752e
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -198,8 -190,9 +198,9 @@@ public class Confi
      public String commitlog_directory;
      public Integer commitlog_total_space_in_mb;
      public CommitLogSync commitlog_sync;
 -    public Double commitlog_sync_batch_window_in_ms;
 -    public Integer commitlog_sync_period_in_ms;
 -    public Integer commitlog_marker_period_in_ms = 0;
 +    public double commitlog_sync_batch_window_in_ms = Double.NaN;
 +    public int commitlog_sync_period_in_ms;
++    public int commitlog_marker_period_in_ms = 0;
      public int commitlog_segment_size_in_mb = 32;
      public ParameterizedClass commitlog_compression;
      public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7f3c9f8,0000000..2c324aa
mode 100755,000000..100755
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,552 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.BooleanSupplier;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +
 +/**
 + * Performs eager-creation of commit log segments in a background thread. All the
 + * public methods are thread safe.
 + */
 +public abstract class AbstractCommitLogSegmentManager
 +{
 +    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 +
 +    /**
 +     * Segment that is ready to be used. The management thread fills this and blocks until consumed.
 +     *
 +     * A single management thread produces this, and consumers are already synchronizing to make sure other work is
 +     * performed atomically with consuming this. Volatile to make sure writes by the management thread become
 +     * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
 +     * synchronize on 'this'.
 +     */
 +    private volatile CommitLogSegment availableSegment = null;
 +
 +    private final WaitQueue segmentPrepared = new WaitQueue();
 +
 +    /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
 +
 +    /**
 +     * The segment we are currently allocating commit log records to.
 +     *
 +     * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
 +     */
 +    private volatile CommitLogSegment allocatingFrom = null;
 +
 +    final String storageDirectory;
 +
 +    /**
 +     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
 +     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
 +     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
 +     * on the manager thread, which will take a ms or two).
 +     */
 +    private final AtomicLong size = new AtomicLong();
 +
 +    private Thread managerThread;
 +    protected final CommitLog commitLog;
 +    private volatile boolean shutdown;
 +    private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown;
 +    private final WaitQueue managerThreadWaitQueue = new WaitQueue();
 +
 +    private static final SimpleCachedBufferPool bufferPool =
 +        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +    AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
 +    {
 +        this.commitLog = commitLog;
 +        this.storageDirectory = storageDirectory;
 +    }
 +
 +    void start()
 +    {
 +        // The run loop for the manager thread
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws Exception
 +            {
 +                while (!shutdown)
 +                {
 +                    try
 +                    {
 +                        assert availableSegment == null;
 +                        logger.debug("No segments in reserve; creating a fresh one");
 +                        availableSegment = createSegment();
 +                        if (shutdown)
 +                        {
 +                            // If shutdown() started and finished during segment creation, we are now left with a
 +                            // segment that no one will consume. Discard it.
 +                            discardAvailableSegment();
 +                            return;
 +                        }
 +
 +                        segmentPrepared.signalAll();
 +                        Thread.yield();
 +
 +                        if (availableSegment == null && !atSegmentBufferLimit())
 +                            // Writing threads need another segment now.
 +                            continue;
 +
 +                        // Writing threads are not waiting for new segments, we can spend time on other tasks.
 +                        // flush old Cfs if we're full
 +                        maybeFlushToReclaim();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        JVMStabilityInspector.inspectThrowable(t);
 +                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
 +                            return;
 +                        // sleep some arbitrary period to avoid spamming CL
 +                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +                        // If we offered a segment, wait for it to be taken before reentering the loop.
 +                        // There could be a new segment in next not offered, but only on failure to discard it while
 +                        // shutting down-- nothing more can or needs to be done in that case.
 +                    }
 +
 +                    WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
 +                }
 +            }
 +        };
 +
 +        shutdown = false;
 +        managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
 +        managerThread.start();
 +
 +        // for simplicity, ensure the first segment is allocated before continuing
 +        advanceAllocatingFrom(null);
 +    }
 +
 +    private boolean atSegmentBufferLimit()
 +    {
 +        return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
 +    }
 +
 +    private void maybeFlushToReclaim()
 +    {
 +        long unused = unusedCapacity();
 +        if (unused < 0)
 +        {
 +            long flushingSize = 0;
 +            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment == allocatingFrom)
 +                    break;
 +                flushingSize += segment.onDiskSize();
 +                segmentsToRecycle.add(segment);
 +                if (flushingSize + unused >= 0)
 +                    break;
 +            }
 +            flushDataFrom(segmentsToRecycle, false);
 +        }
 +    }
 +
 +
 +    /**
 +     * Allocate a segment within this CLSM. Should either succeed or throw.
 +     */
 +    public abstract Allocation allocate(Mutation mutation, int size);
 +
 +    /**
 +     * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
 +     * decide what to do with those segments on disk after they've been replayed.
 +     */
 +    abstract void handleReplayedSegment(final File file);
 +
 +    /**
 +     * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
 +     * to segment manager so it's performed on segment management thread.
 +     */
 +    abstract CommitLogSegment createSegment();
 +
 +    /**
 +     * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
 +     * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
 +     * during testing resets.
 +     *
 +     * @param segment segment to be discarded
 +     * @param delete  whether or not the segment is safe to be deleted.
 +     */
 +    abstract void discard(CommitLogSegment segment, boolean delete);
 +
 +    /**
 +     * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
 +     *
 +     * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
 +     */
 +    @DontInline
 +    void advanceAllocatingFrom(CommitLogSegment old)
 +    {
 +        while (true)
 +        {
 +            synchronized (this)
 +            {
 +                // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
 +                if (allocatingFrom != old)
 +                    return;
 +
 +                // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
 +                if (availableSegment != null)
 +                {
 +                    // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
 +                    // the critical section.
 +                    activeSegments.add(allocatingFrom = availableSegment);
 +                    availableSegment = null;
 +                    break;
 +                }
 +            }
 +
 +            awaitAvailableSegment(old);
 +        }
 +
 +        // Signal the management thread to prepare a new segment.
 +        wakeManager();
 +
 +        if (old != null)
 +        {
 +            // Now we can run the user defined command just after switching to the new commit log.
 +            // (Do this here instead of in the recycle call so we can get a head start on the archive.)
 +            commitLog.archiver.maybeArchive(old);
 +
 +            // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
 +            old.discardUnusedTail();
 +        }
 +
 +        // request that the CL be synced out-of-band, as we've finished a segment
 +        commitLog.requestExtraSync();
 +    }
 +
 +    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
 +    {
 +        do
 +        {
 +            WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 +            if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
 +                prepared.awaitUninterruptibly();
 +            else
 +                prepared.cancel();
 +        }
 +        while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
 +    }
 +
 +    /**
 +     * Switch to a new segment, regardless of how much is left in the current one.
 +     *
 +     * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
 +     */
 +    void forceRecycleAll(Iterable<UUID> droppedCfs)
 +    {
 +        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
 +        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
 +        advanceAllocatingFrom(last);
 +
 +        // wait for the commit log modifications
 +        last.waitForModifications();
 +
 +        // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
 +        // to complete
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
 +        Future<?> future = flushDataFrom(segmentsToRecycle, true);
 +        try
 +        {
 +            future.get();
 +
 +            for (CommitLogSegment segment : activeSegments)
 +                for (UUID cfId : droppedCfs)
 +                    segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +
 +            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
 +            // if the previous active segment was the only one to recycle (since an active segment isn't
 +            // necessarily dirty, and we only call dCS after a flush).
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment.isUnused())
 +                    archiveAndDiscard(segment);
 +            }
 +
 +            CommitLogSegment first;
 +            if ((first = activeSegments.peek()) != null && first.id <= last.id)
 +                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
 +        }
 +        catch (Throwable t)
 +        {
 +            // for now just log the error
 +            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment is no longer in use and that it should be discarded.
 +     *
 +     * @param segment segment that is no longer in use
 +     */
 +    void archiveAndDiscard(final CommitLogSegment segment)
 +    {
 +        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
 +        if (!activeSegments.remove(segment))
 +            return; // already discarded
 +        // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
 +        logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
 +        discard(segment, archiveSuccess);
 +    }
 +
 +    /**
 +     * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
 +     * @param addedSize
 +     */
 +    void addSize(long addedSize)
 +    {
 +        size.addAndGet(addedSize);
 +    }
 +
 +    /**
 +     * @return the space (in bytes) used by all segment files.
 +     */
 +    public long onDiskSize()
 +    {
 +        return size.get();
 +    }
 +
 +    private long unusedCapacity()
 +    {
 +        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
 +        long currentSize = size.get();
 +        logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
 +        return total - currentSize;
 +    }
 +
 +    /**
 +     * Force a flush on all CFs that are still dirty in @param segments.
 +     *
 +     * @return a Future that will finish when all the flushes are complete.
 +     */
 +    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
 +    {
 +        if (segments.isEmpty())
 +            return Futures.immediateFuture(null);
 +        final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 +
 +        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
 +        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
 +
 +        for (CommitLogSegment segment : segments)
 +        {
 +            for (UUID dirtyCFId : segment.getDirtyCFIDs())
 +            {
 +                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
 +                if (pair == null)
 +                {
 +                    // even though we remove the schema entry before a final flush when dropping a CF,
 +                    // it's still possible for a writer to race and finish his append after the flush.
 +                    logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
 +                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +                }
 +                else if (!flushes.containsKey(dirtyCFId))
 +                {
 +                    String keyspace = pair.left;
 +                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
 +                    // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
 +                    // no deadlock possibility since switchLock removal
 +                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
 +                }
 +            }
 +        }
 +
 +        return Futures.allAsList(flushes.values());
 +    }
 +
 +    /**
 +     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
 +     * Only call this after the AbstractCommitLogService is shut down.
 +     */
 +    public void stopUnsafe(boolean deleteSegments)
 +    {
 +        logger.debug("CLSM closing and clearing existing commit log segments...");
 +
 +        shutdown();
 +        try
 +        {
 +            awaitTermination();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +        activeSegments.clear();
 +
 +        size.set(0L);
 +
 +        logger.trace("CLSM done with closing and clearing existing commit log segments.");
 +    }
 +
 +    /**
 +     * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
 +     */
 +    void awaitManagementTasksCompletion()
 +    {
 +        if (availableSegment == null && !atSegmentBufferLimit())
 +        {
 +            awaitAvailableSegment(allocatingFrom);
 +        }
 +    }
 +
 +    /**
 +     * Explicitly for use only during resets in unit testing.
 +     */
 +    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
 +    {
 +        try
 +        {
 +            discard(segment, delete);
 +        }
 +        catch (AssertionError ignored)
 +        {
 +            // segment file does not exist
 +        }
 +    }
 +
 +    /**
 +     * Initiates the shutdown process for the management thread.
 +     */
 +    public void shutdown()
 +    {
 +        assert !shutdown;
 +        shutdown = true;
 +
 +        // Release the management thread and delete prepared segment.
 +        // Do not block as another thread may claim the segment (this can happen during unit test initialization).
 +        discardAvailableSegment();
 +        wakeManager();
 +    }
 +
 +    private void discardAvailableSegment()
 +    {
 +        CommitLogSegment next = null;
 +        synchronized (this)
 +        {
 +            next = availableSegment;
 +            availableSegment = null;
 +        }
 +        if (next != null)
 +            next.discard(true);
 +    }
 +
 +    /**
 +     * Returns when the management thread terminates.
 +     */
 +    public void awaitTermination() throws InterruptedException
 +    {
 +        managerThread.join();
 +        managerThread = null;
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            segment.close();
 +
 +        bufferPool.shutdown();
 +    }
 +
 +    /**
 +     * @return a read-only collection of the active commit log segments
 +     */
 +    @VisibleForTesting
 +    public Collection<CommitLogSegment> getActiveSegments()
 +    {
 +        return Collections.unmodifiableCollection(activeSegments);
 +    }
 +
 +    /**
 +     * @return the current CommitLogPosition of the active segment we're allocating from
 +     */
 +    CommitLogPosition getCurrentPosition()
 +    {
 +        return allocatingFrom.getCurrentCommitLogPosition();
 +    }
 +
 +    /**
-      * Forces a disk flush on the commit log files that need it.  Blocking.
++     * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk.
++     *
++     * @param flush Request that the sync operation flush the file to disk.
 +     */
-     public void sync() throws IOException
++    public void sync(boolean flush) throws IOException
 +    {
 +        CommitLogSegment current = allocatingFrom;
 +        for (CommitLogSegment segment : getActiveSegments())
 +        {
 +            // Do not sync segments that became active after sync started.
 +            if (segment.id > current.id)
 +                return;
-             segment.sync();
++            segment.sync(flush);
 +        }
 +    }
 +
 +    /**
 +     * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
 +     */
 +    SimpleCachedBufferPool getBufferPool()
 +    {
 +        return bufferPool;
 +    }
 +
 +    void wakeManager()
 +    {
 +        managerThreadWaitQueue.signalAll();
 +    }
 +
 +    /**
 +     * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
 +     * a buffer to become available.
 +     */
 +    void notifyBufferFreed()
 +    {
 +        wakeManager();
 +    }
 +
 +    /** Read-only access to current segment for subclasses. */
 +    CommitLogSegment allocatingFrom()
 +    {
 +        return allocatingFrom;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 71100a3,8a03b2f..0410650
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,16 -17,9 +17,18 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
  import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
@@@ -48,7 -47,25 +50,24 @@@ public abstract class AbstractCommitLog
  
      final CommitLog commitLog;
      private final String name;
-     private final long pollIntervalNanos;
+ 
+     /**
+      * The duration between syncs to disk.
+      */
 -    private final long syncIntervalMillis;
++    private final long syncIntervalNanos;
+ 
+     /**
+      * The duration between updating the chained markers in the the commit log file. This value should be
 -     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
++     * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}.
+      */
 -    private final long markerIntervalMillis;
++    private final long markerIntervalNanos;
+ 
+     /**
+      * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+      * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+      * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+      */
+     private volatile boolean syncRequested;
  
      private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
  
@@@ -62,15 -90,30 +92,30 @@@
      {
          this.commitLog = commitLog;
          this.name = name;
-         this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
 -        this.syncIntervalMillis = syncIntervalMillis;
++        this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
+ 
 -        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
++        // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers
+         // faster than the sync interval
 -        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
++        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption())
+             markerIntervalMillis = syncIntervalMillis;
+ 
+         // apply basic bounds checking on the marker interval
+         if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+         {
+             logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                         markerIntervalMillis, syncIntervalMillis);
+             markerIntervalMillis = syncIntervalMillis;
+         }
+ 
 -        this.markerIntervalMillis = markerIntervalMillis;
++        this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS);
      }
  
      // Separated into individual method to ensure relevant objects are constructed before this is started.
      void start()
      {
-         if (pollIntervalNanos < 1)
 -        if (syncIntervalMillis < 1)
++        if (syncIntervalNanos < 1)
              throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
-                                                              pollIntervalNanos * 1e-6));
 -                                                             syncIntervalMillis * 1e-6));
++                                                             syncIntervalNanos * 1e-6));
  
          Runnable runnable = new Runnable()
          {
@@@ -82,25 -125,34 +127,33 @@@
                  int lagCount = 0;
                  int syncCount = 0;
  
 -                boolean run = true;
 -                while (run)
 +                while (true)
                  {
 +                    // always run once after shutdown signalled
 +                    boolean shutdownRequested = shutdown;
 +
                      try
                      {
 -                        // always run once after shutdown signalled
 -                        run = !shutdown;
 -
                          // sync and signal
-                         long syncStarted = System.nanoTime();
-                         // This is a target for Byteman in CommitLogSegmentManagerTest
-                         commitLog.sync();
-                         lastSyncedAt = syncStarted;
-                         syncComplete.signalAll();
- 
 -                        long pollStarted = System.currentTimeMillis();
 -                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
++                        long pollStarted = System.nanoTime();
++                        if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
+                         {
+                             // in this branch, we want to flush the commit log to disk
 -                            commitLog.sync(shutdown, true);
++                            commitLog.sync(true);
+                             syncRequested = false;
+                             lastSyncedAt = pollStarted;
+                             syncComplete.signalAll();
+                         }
+                         else
+                         {
+                             // in this branch, just update the commit log sync headers
 -                            commitLog.sync(false, false);
++                            commitLog.sync(false);
+                         }
  
                          // sleep any time we have left before the next one is due
 -                        long now = System.currentTimeMillis();
 -                        long sleep = pollStarted + markerIntervalMillis - now;
 -                        if (sleep < 0)
 +                        long now = System.nanoTime();
-                         long wakeUpAt = syncStarted + pollIntervalNanos;
++                        long wakeUpAt = pollStarted + markerIntervalNanos;
 +                        if (wakeUpAt < now)
                          {
                              // if we have lagged noticeably, update our lag counter
                              if (firstLagAt == 0)
@@@ -143,7 -200,14 +196,7 @@@
                              break;
  
                          // sleep for full poll-interval after an error, so we don't spam the log file
-                         LockSupport.parkNanos(pollIntervalNanos);
 -                        try
 -                        {
 -                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
 -                        }
 -                        catch (InterruptedException e)
 -                        {
 -                            throw new AssertionError();
 -                        }
++                        LockSupport.parkNanos(markerIntervalNanos);
                      }
                  }
              }
@@@ -166,11 -229,14 +219,12 @@@
      protected abstract void maybeWaitForSync(Allocation alloc);
  
      /**
 -     * Sync immediately, but don't block for the sync to cmplete
 +     * Request an additional sync cycle without blocking.
       */
-     public void requestExtraSync()
 -    public WaitQueue.Signal requestExtraSync()
++    void requestExtraSync()
      {
+         syncRequested = true;
 -        WaitQueue.Signal signal = syncComplete.register();
 -        haveWork.release(1);
 -        return signal;
 +        LockSupport.unpark(thread);
      }
  
      public void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 750fabc,ff1b712..da29258
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -221,9 -220,15 +221,9 @@@ public class CommitLog implements Commi
      /**
       * Forces a disk flush on the commit log files that need it.  Blocking.
       */
-     public void sync() throws IOException
 -    public void sync(boolean syncAllSegments, boolean flush)
++    public void sync(boolean flush) throws IOException
      {
-         segmentManager.sync();
 -        CommitLogSegment current = allocator.allocatingFrom();
 -        for (CommitLogSegment segment : allocator.getActiveSegments())
 -        {
 -            if (!syncAllSegments && segment.id > current.id)
 -                return;
 -            segment.sync(flush);
 -        }
++        segmentManager.sync(flush);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a618d0b,8834c8c..7c02892
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -169,26 -170,12 +175,27 @@@ public abstract class CommitLogSegmen
          }
  
          buffer = createBuffer(commitLog);
 -        // write the header
 -        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +    }
 +
 +    /**
 +     * Deferred writing of the commit log header until subclasses have had a chance to initialize
 +     */
 +    void writeLogHeader()
 +    {
 +        CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
          endOfBuffer = buffer.capacity();
-         lastSyncedOffset = buffer.position();
+ 
+         lastSyncedOffset = lastMarkerOffset = buffer.position();
          allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
 +        headerWritten = true;
 +    }
 +
 +    /**
 +     * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
 +     */
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        return Collections.<String, String>emptyMap();
      }
  
      abstract ByteBuffer createBuffer(CommitLog commitLog);
@@@ -291,15 -278,18 +298,20 @@@
      }
  
      /**
-      * Forces a disk flush for this segment file.
+      * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+      *
+      * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
       */
-     synchronized void sync()
+     synchronized void sync(boolean flush)
      {
 +        if (!headerWritten)
 +            throw new IllegalStateException("commit log header has not been written");
-         boolean close = false;
+         assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                     lastMarkerOffset, lastSyncedOffset);
          // check we have more work to do
-         if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+         final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+         final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+         if (!(needToMarkData || hasDataToFlush))
              return;
          // Note: Even if the very first allocation of this sync section failed, we still want to enter this
          // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 967db15,8e05112..d5e6113
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -17,27 -17,51 +17,26 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.Queue;
 -import java.util.concurrent.ConcurrentLinkedQueue;
 -import java.util.concurrent.atomic.AtomicInteger;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSWriteError;
 -import org.apache.cassandra.io.compress.BufferType;
  import org.apache.cassandra.io.compress.ICompressor;
 -import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.SyncUtil;
  
 -/*
 +/**
   * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
   * section of the buffer and writes it to the destination channel.
 + *
 + * The format of the compressed commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
-  * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a block of compressed data
   */
 -public class CompressedSegment extends CommitLogSegment
 +public class CompressedSegment extends FileDirectSegment
  {
 -    private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
 -        protected ByteBuffer initialValue()
 -        {
 -            return ByteBuffer.allocate(0);
 -        }
 -    };
 -    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
 -
 -    /**
 -     * The number of buffers in use
 -     */
 -    private static AtomicInteger usedBuffers = new AtomicInteger(0);
 -
 -
 -    /**
 -     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
 -     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
 -     * more, depending on how soon the sync policy stops all writing threads.
 -     */
 -    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
 -
      static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
      final ICompressor compressor;
 -    final Runnable onClose;
 -
 -    volatile long lastWrittenPos = 0;
  
      /**
       * Constructs a new segment file.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 87825ab,0000000..21b7c11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,159 -1,0 +1,156 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +import javax.crypto.Cipher;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.security.EncryptionUtils;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.utils.Hex;
- import org.apache.cassandra.utils.SyncUtil;
 +
 +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
 +
 +/**
 + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
 + * the encryption algorithms.
 + *
 + * The format of the encrypted commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
 + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a series of encrypted data blocks, each of which contains:
 + * --- the length of the encrypted block (cipher text)
 + * --- the length of the unencrypted data (compressed text)
 + * --- the encrypted block, which contains:
 + * ---- the length of the plain text (raw) data
 + * ---- block of compressed data
 + *
 + * Notes:
 + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
 + * to the output buffer, and we need to ignore that padding when processing.
 + */
 +public class EncryptedSegment extends FileDirectSegment
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
 +
 +    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
 +
 +    private final EncryptionContext encryptionContext;
 +    private final Cipher cipher;
 +
 +    public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
 +    {
 +        super(commitLog, manager);
 +        this.encryptionContext = commitLog.configuration.getEncryptionContext();
 +
 +        try
 +        {
 +            cipher = encryptionContext.getEncryptor();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, logFile);
 +        }
 +        logger.debug("created a new encrypted commit log segment: {}", logFile);
 +        // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption
 +        manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
 +    }
 +
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        Map<String, String> map = encryptionContext.toHeaderParameters();
 +        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
 +        return map;
 +    }
 +
 +    ByteBuffer createBuffer(CommitLog commitLog)
 +    {
 +        // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
 +        return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
 +    }
 +
 +    void write(int startMarker, int nextMarker)
 +    {
 +        int contentStart = startMarker + SYNC_MARKER_SIZE;
 +        final int length = nextMarker - contentStart;
 +        // The length may be 0 when the segment is being closed.
 +        assert length > 0 || length == 0 && !isStillAllocating();
 +
 +        final ICompressor compressor = encryptionContext.getCompressor();
 +        final int blockSize = encryptionContext.getChunkLength();
 +        try
 +        {
 +            ByteBuffer inputBuffer = buffer.duplicate();
 +            inputBuffer.limit(contentStart + length).position(contentStart);
 +            ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +            // save space for the sync marker at the beginning of this section
 +            final long syncMarkerPosition = lastWrittenPos;
 +            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
 +
 +            // loop over the segment data in encryption buffer sized chunks
 +            while (contentStart < nextMarker)
 +            {
 +                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
 +                ByteBuffer slice = inputBuffer.duplicate();
 +                slice.limit(contentStart + nextBlockSize).position(contentStart);
 +
 +                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
 +
 +                // reuse the same buffer for the input and output of the encryption operation
 +                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
 +
 +                contentStart += nextBlockSize;
 +                manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
 +            }
 +
 +            lastWrittenPos = channel.position();
 +
 +            // rewind to the beginning of the section and write out the sync marker
 +            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
 +            writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
 +            buffer.putInt(SYNC_MARKER_SIZE, length);
 +            buffer.rewind();
 +            manager.addSize(buffer.limit());
 +
 +            channel.position(syncMarkerPosition);
 +            channel.write(buffer);
- 
-             SyncUtil.force(channel, true);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    public long onDiskSize()
 +    {
 +        return lastWrittenPos;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 55084be,0000000..d5431f8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@@ -1,66 -1,0 +1,80 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.io.FSWriteError;
++import org.apache.cassandra.utils.SyncUtil;
 +
 +/**
 + * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
 + * such as compression or encryption, before writing out to disk.
 + */
 +public abstract class FileDirectSegment extends CommitLogSegment
 +{
 +    volatile long lastWrittenPos = 0;
 +
 +    FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
 +    {
 +        super(commitLog, manager);
 +    }
 +
 +    @Override
 +    void writeLogHeader()
 +    {
 +        super.writeLogHeader();
 +        try
 +        {
 +            channel.write((ByteBuffer) buffer.duplicate().flip());
 +            manager.addSize(lastWrittenPos = buffer.position());
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    @Override
 +    protected void internalClose()
 +    {
 +        try
 +        {
 +            manager.getBufferPool().releaseBuffer(buffer);
 +            super.internalClose();
 +        }
 +        finally
 +        {
 +            manager.notifyBufferFreed();
 +        }
 +    }
++
++    @Override
++    protected void flush(int startMarker, int nextMarker)
++    {
++        try
++        {
++            SyncUtil.force(channel, true);
++        }
++        catch (Exception e)
++        {
++            throw new FSWriteError(e, getPath());
++        }
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,e2b9f72..663e7af
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,128 +1,98 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
++import java.util.ArrayList;
+ import java.util.Random;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
 -import org.apache.cassandra.db.rows.SerializationHelper;
 -import org.apache.cassandra.io.util.DataInputBuffer;
 -import org.apache.cassandra.io.util.RebufferingInputStream;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+  * in the commit log segment but do not flush the file to disk.
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+ 
+     @Test
+     @BMRule(name = "force all calls to sync() to not flush to disk",
+     targetClass = "CommitLogSegment",
+     targetMethod = "sync(boolean)",
+     action = "$flush = false")
+     public void replayCommitLogWithoutFlushing() throws IOException
+     {
++        // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
++        DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogSegmentSize(5);
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+         DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         byte[] entropy = new byte[1024];
+         new Random().nextBytes(entropy);
+         final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                            .clustering("bytes")
+                            .add("val", ByteBuffer.wrap(entropy))
+                            .build();
+ 
+         int samples = 10000;
+         for (int i = 0; i < samples; i++)
+             CommitLog.instance.add(m);
+ 
 -        CommitLog.instance.sync(false, true);
++        CommitLog.instance.sync(false);
+ 
 -        Replayer replayer = new Replayer(cfs1.metadata);
 -        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
 -        replayer.recover(commitLogDir.listFiles());
 -        Assert.assertEquals(samples, replayer.count);
 -    }
 -
 -    private static class Replayer extends CommitLogReplayer
 -    {
 -        private final CFMetaData cfm;
 -        private int count;
 -
 -        Replayer(CFMetaData cfm)
 -        {
 -            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
 -            this.cfm = cfm;
 -        }
 -
 -        @Override
 -        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
 -        {
 -            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 -            try
 -            {
 -                Mutation mutation = Mutation.serializer.deserialize(bufIn,
 -                                                           desc.getMessagingVersion(),
 -                                                           SerializationHelper.Flag.LOCAL);
++        ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
++        CommitLogReader reader = new CommitLogReader();
++        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++        for (File f : toCheck)
++            reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+ 
 -                if (cfm == null || mutation.get(cfm) != null)
 -                    count++;
 -            }
 -            catch (IOException e)
 -            {
 -                // Test fails.
 -                throw new AssertionError(e);
 -            }
 -        }
++        Assert.assertEquals(samples, testHandler.seenMutationCount());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 3956de5,a1999ef..46a3fb0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -67,12 -66,12 +67,12 @@@ public class CommitLogSegmentBackpressu
      @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                                targetClass = "AbstractCommitLogService$1",
                                targetMethod = "run",
-                               targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
 -                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                        @BMRule(name = "Release Semaphore after sync",
                                targetClass = "AbstractCommitLogService$1",
                                targetMethod = "run",
-                               targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
 -                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
      public void testCompressedCommitLogBackpressure() throws Throwable
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 267813e,b8f68ed..215ad6c
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -360,10 -339,10 +360,10 @@@ public class CommitLogTes
  
          // "Flush": this won't delete anything
          UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
 -        CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++        CommitLog.instance.sync(true);
 +        CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
 -        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 +        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
          Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -678,114 -612,6 +678,114 @@@
      }
  
      @Test
 +    public void replaySimple() throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm2);
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    @Test
 +    public void replayWithDiscard() throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        CommitLogPosition commitLogPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
 +            CommitLogPosition position = CommitLog.instance.add(rm1);
 +
 +            if (i == discardPosition)
 +                commitLogPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class SimpleCountingReplayer extends CommitLogReplayer
 +    {
 +        private final CommitLogPosition filterPosition;
 +        private final CFMetaData metadata;
 +        int cells;
 +        int skipped;
 +
 +        SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
 +        {
 +            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +            this.metadata = cfm;
 +        }
 +
 +        @SuppressWarnings("resource")
 +        @Override
 +        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
 +        {
 +            // Filter out system writes that could flake the test.
 +            if (!KEYSPACE1.equals(m.getKeyspaceName()))
 +                return;
 +
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +            for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
 +            {
 +                // Only process mutations for the CF's we're testing against, since we can't deterministically predict
 +                // whether or not system keyspaces will be mutated during a test.
 +                if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
 +                {
 +                    for (Row row : partitionUpdate)
 +                        cells += Iterables.size(row.cells());
 +                }
 +            }
 +        }
 +    }
 +
      public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
      {
          CommitLog.instance.resetUnsafe(true);
@@@ -826,7 -652,7 +826,7 @@@
              DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
          }
  
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
++        CommitLog.instance.sync(true);
          System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
          // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
          // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@@ -859,7 -685,7 +859,7 @@@
          for (SSTableReader reader : cfs.getLiveSSTables())
              reader.reloadSSTableMetadata();
  
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
++        CommitLog.instance.sync(true);
          System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
          // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
          // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 7f43378,36973f2..9a22b04
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@@ -35,44 -36,44 +35,44 @@@ import org.apache.cassandra.io.util.Reb
   */
  public class CommitLogTestReplayer extends CommitLogReplayer
  {
 -    public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
 -    {
 -        CommitLog.instance.sync(true, true);
 -
 -        CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
 -        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
 -        replayer.recover(commitLogDir.listFiles());
 -    }
 -
 -    final private Predicate<Mutation> processor;
 +    private final Predicate<Mutation> processor;
  
 -    public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
 +    public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
      {
 -        this(log, ReplayPosition.NONE, processor);
 +        super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        this.processor = processor;
 +        commitLogReader = new CommitLogTestReader();
      }
  
 -    public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor)
 +    public void examineCommitLog() throws IOException
      {
 -        super(log, discardedPos, null, ReplayFilter.create());
 -        this.processor = processor;
 +        replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
      }
  
 -    @Override
 -    void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
 +    private class CommitLogTestReader extends CommitLogReader
      {
 -        RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 -        Mutation mutation;
 -        try
 -        {
 -            mutation = Mutation.serializer.deserialize(bufIn,
 -                                                           desc.getMessagingVersion(),
 -                                                           SerializationHelper.Flag.LOCAL);
 -            Assert.assertTrue(processor.apply(mutation));
 -        }
 -        catch (IOException e)
 +        @Override
 +        protected void readMutation(CommitLogReadHandler handler,
 +                                    byte[] inputBuffer,
 +                                    int size,
 +                                    CommitLogPosition minPosition,
 +                                    final int entryLocation,
 +                                    final CommitLogDescriptor desc) throws IOException
          {
 -            // Test fails.
 -            throw new AssertionError(e);
 +            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +                Assert.assertTrue(processor.apply(mutation));
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
          }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3a1a4fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3a1a4fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3a1a4fa

Branch: refs/heads/trunk
Commit: c3a1a4fa8083b2c4c6a5454551979954a0a71339
Parents: d577918 05cb556
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:07:18 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:08:22 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 10 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++
 .../AbstractCommitLogSegmentManager.java        |  8 +-
 .../db/commitlog/AbstractCommitLogService.java  | 84 ++++++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 .../db/commitlog/CommitLogSegment.java          | 85 +++++++++++------
 .../db/commitlog/CompressedSegment.java         |  4 +-
 .../db/commitlog/EncryptedSegment.java          |  3 -
 .../db/commitlog/FileDirectSegment.java         | 14 +++
 .../db/commitlog/MemoryMappedSegment.java       |  4 +
 .../db/commitlog/PeriodicCommitLogService.java  |  2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 98 ++++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |  4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 10 +-
 .../db/commitlog/CommitLogTestReplayer.java     |  2 +-
 17 files changed, 281 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e26aeb8,2683dc2..7c0af91
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
 -3.0.16
 +3.11.2
 + * Remove OpenJDK log warning (CASSANDRA-13916)
 + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
 + * Cache disk boundaries (CASSANDRA-13215)
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
+  * More frequent commitlog chained markers (CASSANDRA-13987)
   * Fix serialized size of DataLimits (CASSANDRA-14057)
   * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
   * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,0796183..5fe752e
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -198,8 -190,9 +198,9 @@@ public class Confi
      public String commitlog_directory;
      public Integer commitlog_total_space_in_mb;
      public CommitLogSync commitlog_sync;
 -    public Double commitlog_sync_batch_window_in_ms;
 -    public Integer commitlog_sync_period_in_ms;
 -    public Integer commitlog_marker_period_in_ms = 0;
 +    public double commitlog_sync_batch_window_in_ms = Double.NaN;
 +    public int commitlog_sync_period_in_ms;
++    public int commitlog_marker_period_in_ms = 0;
      public int commitlog_segment_size_in_mb = 32;
      public ParameterizedClass commitlog_compression;
      public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7f3c9f8,0000000..2c324aa
mode 100755,000000..100755
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,552 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.BooleanSupplier;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +
 +/**
 + * Performs eager-creation of commit log segments in a background thread. All the
 + * public methods are thread safe.
 + */
 +public abstract class AbstractCommitLogSegmentManager
 +{
 +    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 +
 +    /**
 +     * Segment that is ready to be used. The management thread fills this and blocks until consumed.
 +     *
 +     * A single management thread produces this, and consumers are already synchronizing to make sure other work is
 +     * performed atomically with consuming this. Volatile to make sure writes by the management thread become
 +     * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
 +     * synchronize on 'this'.
 +     */
 +    private volatile CommitLogSegment availableSegment = null;
 +
 +    private final WaitQueue segmentPrepared = new WaitQueue();
 +
 +    /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
 +
 +    /**
 +     * The segment we are currently allocating commit log records to.
 +     *
 +     * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
 +     */
 +    private volatile CommitLogSegment allocatingFrom = null;
 +
 +    final String storageDirectory;
 +
 +    /**
 +     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
 +     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
 +     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
 +     * on the manager thread, which will take a ms or two).
 +     */
 +    private final AtomicLong size = new AtomicLong();
 +
 +    private Thread managerThread;
 +    protected final CommitLog commitLog;
 +    private volatile boolean shutdown;
 +    private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown;
 +    private final WaitQueue managerThreadWaitQueue = new WaitQueue();
 +
 +    private static final SimpleCachedBufferPool bufferPool =
 +        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +    AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
 +    {
 +        this.commitLog = commitLog;
 +        this.storageDirectory = storageDirectory;
 +    }
 +
 +    void start()
 +    {
 +        // The run loop for the manager thread
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws Exception
 +            {
 +                while (!shutdown)
 +                {
 +                    try
 +                    {
 +                        assert availableSegment == null;
 +                        logger.debug("No segments in reserve; creating a fresh one");
 +                        availableSegment = createSegment();
 +                        if (shutdown)
 +                        {
 +                            // If shutdown() started and finished during segment creation, we are now left with a
 +                            // segment that no one will consume. Discard it.
 +                            discardAvailableSegment();
 +                            return;
 +                        }
 +
 +                        segmentPrepared.signalAll();
 +                        Thread.yield();
 +
 +                        if (availableSegment == null && !atSegmentBufferLimit())
 +                            // Writing threads need another segment now.
 +                            continue;
 +
 +                        // Writing threads are not waiting for new segments, we can spend time on other tasks.
 +                        // flush old Cfs if we're full
 +                        maybeFlushToReclaim();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        JVMStabilityInspector.inspectThrowable(t);
 +                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
 +                            return;
 +                        // sleep some arbitrary period to avoid spamming CL
 +                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +                        // If we offered a segment, wait for it to be taken before reentering the loop.
 +                        // There could be a new segment in next not offered, but only on failure to discard it while
 +                        // shutting down-- nothing more can or needs to be done in that case.
 +                    }
 +
 +                    WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
 +                }
 +            }
 +        };
 +
 +        shutdown = false;
 +        managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
 +        managerThread.start();
 +
 +        // for simplicity, ensure the first segment is allocated before continuing
 +        advanceAllocatingFrom(null);
 +    }
 +
 +    private boolean atSegmentBufferLimit()
 +    {
 +        return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
 +    }
 +
 +    private void maybeFlushToReclaim()
 +    {
 +        long unused = unusedCapacity();
 +        if (unused < 0)
 +        {
 +            long flushingSize = 0;
 +            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment == allocatingFrom)
 +                    break;
 +                flushingSize += segment.onDiskSize();
 +                segmentsToRecycle.add(segment);
 +                if (flushingSize + unused >= 0)
 +                    break;
 +            }
 +            flushDataFrom(segmentsToRecycle, false);
 +        }
 +    }
 +
 +
 +    /**
 +     * Allocate a segment within this CLSM. Should either succeed or throw.
 +     */
 +    public abstract Allocation allocate(Mutation mutation, int size);
 +
 +    /**
 +     * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
 +     * decide what to do with those segments on disk after they've been replayed.
 +     */
 +    abstract void handleReplayedSegment(final File file);
 +
 +    /**
 +     * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
 +     * to segment manager so it's performed on segment management thread.
 +     */
 +    abstract CommitLogSegment createSegment();
 +
 +    /**
 +     * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
 +     * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
 +     * during testing resets.
 +     *
 +     * @param segment segment to be discarded
 +     * @param delete  whether or not the segment is safe to be deleted.
 +     */
 +    abstract void discard(CommitLogSegment segment, boolean delete);
 +
 +    /**
 +     * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
 +     *
 +     * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
 +     */
 +    @DontInline
 +    void advanceAllocatingFrom(CommitLogSegment old)
 +    {
 +        while (true)
 +        {
 +            synchronized (this)
 +            {
 +                // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
 +                if (allocatingFrom != old)
 +                    return;
 +
 +                // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
 +                if (availableSegment != null)
 +                {
 +                    // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
 +                    // the critical section.
 +                    activeSegments.add(allocatingFrom = availableSegment);
 +                    availableSegment = null;
 +                    break;
 +                }
 +            }
 +
 +            awaitAvailableSegment(old);
 +        }
 +
 +        // Signal the management thread to prepare a new segment.
 +        wakeManager();
 +
 +        if (old != null)
 +        {
 +            // Now we can run the user defined command just after switching to the new commit log.
 +            // (Do this here instead of in the recycle call so we can get a head start on the archive.)
 +            commitLog.archiver.maybeArchive(old);
 +
 +            // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
 +            old.discardUnusedTail();
 +        }
 +
 +        // request that the CL be synced out-of-band, as we've finished a segment
 +        commitLog.requestExtraSync();
 +    }
 +
 +    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
 +    {
 +        do
 +        {
 +            WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 +            if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
 +                prepared.awaitUninterruptibly();
 +            else
 +                prepared.cancel();
 +        }
 +        while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
 +    }
 +
 +    /**
 +     * Switch to a new segment, regardless of how much is left in the current one.
 +     *
 +     * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
 +     */
 +    void forceRecycleAll(Iterable<UUID> droppedCfs)
 +    {
 +        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
 +        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
 +        advanceAllocatingFrom(last);
 +
 +        // wait for the commit log modifications
 +        last.waitForModifications();
 +
 +        // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
 +        // to complete
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
 +        Future<?> future = flushDataFrom(segmentsToRecycle, true);
 +        try
 +        {
 +            future.get();
 +
 +            for (CommitLogSegment segment : activeSegments)
 +                for (UUID cfId : droppedCfs)
 +                    segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +
 +            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
 +            // if the previous active segment was the only one to recycle (since an active segment isn't
 +            // necessarily dirty, and we only call dCS after a flush).
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment.isUnused())
 +                    archiveAndDiscard(segment);
 +            }
 +
 +            CommitLogSegment first;
 +            if ((first = activeSegments.peek()) != null && first.id <= last.id)
 +                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
 +        }
 +        catch (Throwable t)
 +        {
 +            // for now just log the error
 +            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment is no longer in use and that it should be discarded.
 +     *
 +     * @param segment segment that is no longer in use
 +     */
 +    void archiveAndDiscard(final CommitLogSegment segment)
 +    {
 +        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
 +        if (!activeSegments.remove(segment))
 +            return; // already discarded
 +        // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
 +        logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
 +        discard(segment, archiveSuccess);
 +    }
 +
 +    /**
 +     * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
 +     * @param addedSize
 +     */
 +    void addSize(long addedSize)
 +    {
 +        size.addAndGet(addedSize);
 +    }
 +
 +    /**
 +     * @return the space (in bytes) used by all segment files.
 +     */
 +    public long onDiskSize()
 +    {
 +        return size.get();
 +    }
 +
 +    private long unusedCapacity()
 +    {
 +        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
 +        long currentSize = size.get();
 +        logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
 +        return total - currentSize;
 +    }
 +
 +    /**
 +     * Force a flush on all CFs that are still dirty in @param segments.
 +     *
 +     * @return a Future that will finish when all the flushes are complete.
 +     */
 +    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
 +    {
 +        if (segments.isEmpty())
 +            return Futures.immediateFuture(null);
 +        final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 +
 +        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
 +        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
 +
 +        for (CommitLogSegment segment : segments)
 +        {
 +            for (UUID dirtyCFId : segment.getDirtyCFIDs())
 +            {
 +                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
 +                if (pair == null)
 +                {
 +                    // even though we remove the schema entry before a final flush when dropping a CF,
 +                    // it's still possible for a writer to race and finish his append after the flush.
 +                    logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
 +                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +                }
 +                else if (!flushes.containsKey(dirtyCFId))
 +                {
 +                    String keyspace = pair.left;
 +                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
 +                    // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
 +                    // no deadlock possibility since switchLock removal
 +                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
 +                }
 +            }
 +        }
 +
 +        return Futures.allAsList(flushes.values());
 +    }
 +
 +    /**
 +     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
 +     * Only call this after the AbstractCommitLogService is shut down.
 +     */
 +    public void stopUnsafe(boolean deleteSegments)
 +    {
 +        logger.debug("CLSM closing and clearing existing commit log segments...");
 +
 +        shutdown();
 +        try
 +        {
 +            awaitTermination();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +        activeSegments.clear();
 +
 +        size.set(0L);
 +
 +        logger.trace("CLSM done with closing and clearing existing commit log segments.");
 +    }
 +
 +    /**
 +     * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
 +     */
 +    void awaitManagementTasksCompletion()
 +    {
 +        if (availableSegment == null && !atSegmentBufferLimit())
 +        {
 +            awaitAvailableSegment(allocatingFrom);
 +        }
 +    }
 +
 +    /**
 +     * Explicitly for use only during resets in unit testing.
 +     */
 +    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
 +    {
 +        try
 +        {
 +            discard(segment, delete);
 +        }
 +        catch (AssertionError ignored)
 +        {
 +            // segment file does not exist
 +        }
 +    }
 +
 +    /**
 +     * Initiates the shutdown process for the management thread.
 +     */
 +    public void shutdown()
 +    {
 +        assert !shutdown;
 +        shutdown = true;
 +
 +        // Release the management thread and delete prepared segment.
 +        // Do not block as another thread may claim the segment (this can happen during unit test initialization).
 +        discardAvailableSegment();
 +        wakeManager();
 +    }
 +
 +    private void discardAvailableSegment()
 +    {
 +        CommitLogSegment next = null;
 +        synchronized (this)
 +        {
 +            next = availableSegment;
 +            availableSegment = null;
 +        }
 +        if (next != null)
 +            next.discard(true);
 +    }
 +
 +    /**
 +     * Returns when the management thread terminates.
 +     */
 +    public void awaitTermination() throws InterruptedException
 +    {
 +        managerThread.join();
 +        managerThread = null;
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            segment.close();
 +
 +        bufferPool.shutdown();
 +    }
 +
 +    /**
 +     * @return a read-only collection of the active commit log segments
 +     */
 +    @VisibleForTesting
 +    public Collection<CommitLogSegment> getActiveSegments()
 +    {
 +        return Collections.unmodifiableCollection(activeSegments);
 +    }
 +
 +    /**
 +     * @return the current CommitLogPosition of the active segment we're allocating from
 +     */
 +    CommitLogPosition getCurrentPosition()
 +    {
 +        return allocatingFrom.getCurrentCommitLogPosition();
 +    }
 +
 +    /**
-      * Forces a disk flush on the commit log files that need it.  Blocking.
++     * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk.
++     *
++     * @param flush Request that the sync operation flush the file to disk.
 +     */
-     public void sync() throws IOException
++    public void sync(boolean flush) throws IOException
 +    {
 +        CommitLogSegment current = allocatingFrom;
 +        for (CommitLogSegment segment : getActiveSegments())
 +        {
 +            // Do not sync segments that became active after sync started.
 +            if (segment.id > current.id)
 +                return;
-             segment.sync();
++            segment.sync(flush);
 +        }
 +    }
 +
 +    /**
 +     * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
 +     */
 +    SimpleCachedBufferPool getBufferPool()
 +    {
 +        return bufferPool;
 +    }
 +
 +    void wakeManager()
 +    {
 +        managerThreadWaitQueue.signalAll();
 +    }
 +
 +    /**
 +     * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
 +     * a buffer to become available.
 +     */
 +    void notifyBufferFreed()
 +    {
 +        wakeManager();
 +    }
 +
 +    /** Read-only access to current segment for subclasses. */
 +    CommitLogSegment allocatingFrom()
 +    {
 +        return allocatingFrom;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 71100a3,8a03b2f..0410650
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,16 -17,9 +17,18 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
  import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
@@@ -48,7 -47,25 +50,24 @@@ public abstract class AbstractCommitLog
  
      final CommitLog commitLog;
      private final String name;
-     private final long pollIntervalNanos;
+ 
+     /**
+      * The duration between syncs to disk.
+      */
 -    private final long syncIntervalMillis;
++    private final long syncIntervalNanos;
+ 
+     /**
+      * The duration between updating the chained markers in the the commit log file. This value should be
 -     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
++     * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}.
+      */
 -    private final long markerIntervalMillis;
++    private final long markerIntervalNanos;
+ 
+     /**
+      * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+      * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+      * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+      */
+     private volatile boolean syncRequested;
  
      private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
  
@@@ -62,15 -90,30 +92,30 @@@
      {
          this.commitLog = commitLog;
          this.name = name;
-         this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
 -        this.syncIntervalMillis = syncIntervalMillis;
++        this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
+ 
 -        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
++        // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers
+         // faster than the sync interval
 -        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
++        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption())
+             markerIntervalMillis = syncIntervalMillis;
+ 
+         // apply basic bounds checking on the marker interval
+         if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+         {
+             logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                         markerIntervalMillis, syncIntervalMillis);
+             markerIntervalMillis = syncIntervalMillis;
+         }
+ 
 -        this.markerIntervalMillis = markerIntervalMillis;
++        this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS);
      }
  
      // Separated into individual method to ensure relevant objects are constructed before this is started.
      void start()
      {
-         if (pollIntervalNanos < 1)
 -        if (syncIntervalMillis < 1)
++        if (syncIntervalNanos < 1)
              throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
-                                                              pollIntervalNanos * 1e-6));
 -                                                             syncIntervalMillis * 1e-6));
++                                                             syncIntervalNanos * 1e-6));
  
          Runnable runnable = new Runnable()
          {
@@@ -82,25 -125,34 +127,33 @@@
                  int lagCount = 0;
                  int syncCount = 0;
  
 -                boolean run = true;
 -                while (run)
 +                while (true)
                  {
 +                    // always run once after shutdown signalled
 +                    boolean shutdownRequested = shutdown;
 +
                      try
                      {
 -                        // always run once after shutdown signalled
 -                        run = !shutdown;
 -
                          // sync and signal
-                         long syncStarted = System.nanoTime();
-                         // This is a target for Byteman in CommitLogSegmentManagerTest
-                         commitLog.sync();
-                         lastSyncedAt = syncStarted;
-                         syncComplete.signalAll();
- 
 -                        long pollStarted = System.currentTimeMillis();
 -                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
++                        long pollStarted = System.nanoTime();
++                        if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
+                         {
+                             // in this branch, we want to flush the commit log to disk
 -                            commitLog.sync(shutdown, true);
++                            commitLog.sync(true);
+                             syncRequested = false;
+                             lastSyncedAt = pollStarted;
+                             syncComplete.signalAll();
+                         }
+                         else
+                         {
+                             // in this branch, just update the commit log sync headers
 -                            commitLog.sync(false, false);
++                            commitLog.sync(false);
+                         }
  
                          // sleep any time we have left before the next one is due
 -                        long now = System.currentTimeMillis();
 -                        long sleep = pollStarted + markerIntervalMillis - now;
 -                        if (sleep < 0)
 +                        long now = System.nanoTime();
-                         long wakeUpAt = syncStarted + pollIntervalNanos;
++                        long wakeUpAt = pollStarted + markerIntervalNanos;
 +                        if (wakeUpAt < now)
                          {
                              // if we have lagged noticeably, update our lag counter
                              if (firstLagAt == 0)
@@@ -143,7 -200,14 +196,7 @@@
                              break;
  
                          // sleep for full poll-interval after an error, so we don't spam the log file
-                         LockSupport.parkNanos(pollIntervalNanos);
 -                        try
 -                        {
 -                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
 -                        }
 -                        catch (InterruptedException e)
 -                        {
 -                            throw new AssertionError();
 -                        }
++                        LockSupport.parkNanos(markerIntervalNanos);
                      }
                  }
              }
@@@ -166,11 -229,14 +219,12 @@@
      protected abstract void maybeWaitForSync(Allocation alloc);
  
      /**
 -     * Sync immediately, but don't block for the sync to cmplete
 +     * Request an additional sync cycle without blocking.
       */
-     public void requestExtraSync()
 -    public WaitQueue.Signal requestExtraSync()
++    void requestExtraSync()
      {
+         syncRequested = true;
 -        WaitQueue.Signal signal = syncComplete.register();
 -        haveWork.release(1);
 -        return signal;
 +        LockSupport.unpark(thread);
      }
  
      public void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 750fabc,ff1b712..da29258
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -221,9 -220,15 +221,9 @@@ public class CommitLog implements Commi
      /**
       * Forces a disk flush on the commit log files that need it.  Blocking.
       */
-     public void sync() throws IOException
 -    public void sync(boolean syncAllSegments, boolean flush)
++    public void sync(boolean flush) throws IOException
      {
-         segmentManager.sync();
 -        CommitLogSegment current = allocator.allocatingFrom();
 -        for (CommitLogSegment segment : allocator.getActiveSegments())
 -        {
 -            if (!syncAllSegments && segment.id > current.id)
 -                return;
 -            segment.sync(flush);
 -        }
++        segmentManager.sync(flush);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a618d0b,8834c8c..7c02892
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -169,26 -170,12 +175,27 @@@ public abstract class CommitLogSegmen
          }
  
          buffer = createBuffer(commitLog);
 -        // write the header
 -        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +    }
 +
 +    /**
 +     * Deferred writing of the commit log header until subclasses have had a chance to initialize
 +     */
 +    void writeLogHeader()
 +    {
 +        CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
          endOfBuffer = buffer.capacity();
-         lastSyncedOffset = buffer.position();
+ 
+         lastSyncedOffset = lastMarkerOffset = buffer.position();
          allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
 +        headerWritten = true;
 +    }
 +
 +    /**
 +     * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
 +     */
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        return Collections.<String, String>emptyMap();
      }
  
      abstract ByteBuffer createBuffer(CommitLog commitLog);
@@@ -291,15 -278,18 +298,20 @@@
      }
  
      /**
-      * Forces a disk flush for this segment file.
+      * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+      *
+      * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
       */
-     synchronized void sync()
+     synchronized void sync(boolean flush)
      {
 +        if (!headerWritten)
 +            throw new IllegalStateException("commit log header has not been written");
-         boolean close = false;
+         assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                     lastMarkerOffset, lastSyncedOffset);
          // check we have more work to do
-         if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+         final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+         final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+         if (!(needToMarkData || hasDataToFlush))
              return;
          // Note: Even if the very first allocation of this sync section failed, we still want to enter this
          // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 967db15,8e05112..d5e6113
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -17,27 -17,51 +17,26 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.Queue;
 -import java.util.concurrent.ConcurrentLinkedQueue;
 -import java.util.concurrent.atomic.AtomicInteger;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSWriteError;
 -import org.apache.cassandra.io.compress.BufferType;
  import org.apache.cassandra.io.compress.ICompressor;
 -import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.SyncUtil;
  
 -/*
 +/**
   * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
   * section of the buffer and writes it to the destination channel.
 + *
 + * The format of the compressed commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
-  * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a block of compressed data
   */
 -public class CompressedSegment extends CommitLogSegment
 +public class CompressedSegment extends FileDirectSegment
  {
 -    private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
 -        protected ByteBuffer initialValue()
 -        {
 -            return ByteBuffer.allocate(0);
 -        }
 -    };
 -    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
 -
 -    /**
 -     * The number of buffers in use
 -     */
 -    private static AtomicInteger usedBuffers = new AtomicInteger(0);
 -
 -
 -    /**
 -     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
 -     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
 -     * more, depending on how soon the sync policy stops all writing threads.
 -     */
 -    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
 -
      static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
      final ICompressor compressor;
 -    final Runnable onClose;
 -
 -    volatile long lastWrittenPos = 0;
  
      /**
       * Constructs a new segment file.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 87825ab,0000000..21b7c11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,159 -1,0 +1,156 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +import javax.crypto.Cipher;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.security.EncryptionUtils;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.utils.Hex;
- import org.apache.cassandra.utils.SyncUtil;
 +
 +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
 +
 +/**
 + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
 + * the encryption algorithms.
 + *
 + * The format of the encrypted commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
 + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a series of encrypted data blocks, each of which contains:
 + * --- the length of the encrypted block (cipher text)
 + * --- the length of the unencrypted data (compressed text)
 + * --- the encrypted block, which contains:
 + * ---- the length of the plain text (raw) data
 + * ---- block of compressed data
 + *
 + * Notes:
 + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
 + * to the output buffer, and we need to ignore that padding when processing.
 + */
 +public class EncryptedSegment extends FileDirectSegment
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
 +
 +    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
 +
 +    private final EncryptionContext encryptionContext;
 +    private final Cipher cipher;
 +
 +    public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
 +    {
 +        super(commitLog, manager);
 +        this.encryptionContext = commitLog.configuration.getEncryptionContext();
 +
 +        try
 +        {
 +            cipher = encryptionContext.getEncryptor();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, logFile);
 +        }
 +        logger.debug("created a new encrypted commit log segment: {}", logFile);
 +        // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption
 +        manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
 +    }
 +
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        Map<String, String> map = encryptionContext.toHeaderParameters();
 +        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
 +        return map;
 +    }
 +
 +    ByteBuffer createBuffer(CommitLog commitLog)
 +    {
 +        // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
 +        return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
 +    }
 +
 +    void write(int startMarker, int nextMarker)
 +    {
 +        int contentStart = startMarker + SYNC_MARKER_SIZE;
 +        final int length = nextMarker - contentStart;
 +        // The length may be 0 when the segment is being closed.
 +        assert length > 0 || length == 0 && !isStillAllocating();
 +
 +        final ICompressor compressor = encryptionContext.getCompressor();
 +        final int blockSize = encryptionContext.getChunkLength();
 +        try
 +        {
 +            ByteBuffer inputBuffer = buffer.duplicate();
 +            inputBuffer.limit(contentStart + length).position(contentStart);
 +            ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +            // save space for the sync marker at the beginning of this section
 +            final long syncMarkerPosition = lastWrittenPos;
 +            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
 +
 +            // loop over the segment data in encryption buffer sized chunks
 +            while (contentStart < nextMarker)
 +            {
 +                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
 +                ByteBuffer slice = inputBuffer.duplicate();
 +                slice.limit(contentStart + nextBlockSize).position(contentStart);
 +
 +                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
 +
 +                // reuse the same buffer for the input and output of the encryption operation
 +                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
 +
 +                contentStart += nextBlockSize;
 +                manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
 +            }
 +
 +            lastWrittenPos = channel.position();
 +
 +            // rewind to the beginning of the section and write out the sync marker
 +            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
 +            writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
 +            buffer.putInt(SYNC_MARKER_SIZE, length);
 +            buffer.rewind();
 +            manager.addSize(buffer.limit());
 +
 +            channel.position(syncMarkerPosition);
 +            channel.write(buffer);
- 
-             SyncUtil.force(channel, true);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    public long onDiskSize()
 +    {
 +        return lastWrittenPos;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 55084be,0000000..d5431f8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@@ -1,66 -1,0 +1,80 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.io.FSWriteError;
++import org.apache.cassandra.utils.SyncUtil;
 +
 +/**
 + * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
 + * such as compression or encryption, before writing out to disk.
 + */
 +public abstract class FileDirectSegment extends CommitLogSegment
 +{
 +    volatile long lastWrittenPos = 0;
 +
 +    FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
 +    {
 +        super(commitLog, manager);
 +    }
 +
 +    @Override
 +    void writeLogHeader()
 +    {
 +        super.writeLogHeader();
 +        try
 +        {
 +            channel.write((ByteBuffer) buffer.duplicate().flip());
 +            manager.addSize(lastWrittenPos = buffer.position());
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    @Override
 +    protected void internalClose()
 +    {
 +        try
 +        {
 +            manager.getBufferPool().releaseBuffer(buffer);
 +            super.internalClose();
 +        }
 +        finally
 +        {
 +            manager.notifyBufferFreed();
 +        }
 +    }
++
++    @Override
++    protected void flush(int startMarker, int nextMarker)
++    {
++        try
++        {
++            SyncUtil.force(channel, true);
++        }
++        catch (Exception e)
++        {
++            throw new FSWriteError(e, getPath());
++        }
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,e2b9f72..663e7af
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,128 +1,98 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
++import java.util.ArrayList;
+ import java.util.Random;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
 -import org.apache.cassandra.db.rows.SerializationHelper;
 -import org.apache.cassandra.io.util.DataInputBuffer;
 -import org.apache.cassandra.io.util.RebufferingInputStream;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+  * in the commit log segment but do not flush the file to disk.
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+ 
+     @Test
+     @BMRule(name = "force all calls to sync() to not flush to disk",
+     targetClass = "CommitLogSegment",
+     targetMethod = "sync(boolean)",
+     action = "$flush = false")
+     public void replayCommitLogWithoutFlushing() throws IOException
+     {
++        // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
++        DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogSegmentSize(5);
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+         DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         byte[] entropy = new byte[1024];
+         new Random().nextBytes(entropy);
+         final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                            .clustering("bytes")
+                            .add("val", ByteBuffer.wrap(entropy))
+                            .build();
+ 
+         int samples = 10000;
+         for (int i = 0; i < samples; i++)
+             CommitLog.instance.add(m);
+ 
 -        CommitLog.instance.sync(false, true);
++        CommitLog.instance.sync(false);
+ 
 -        Replayer replayer = new Replayer(cfs1.metadata);
 -        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
 -        replayer.recover(commitLogDir.listFiles());
 -        Assert.assertEquals(samples, replayer.count);
 -    }
 -
 -    private static class Replayer extends CommitLogReplayer
 -    {
 -        private final CFMetaData cfm;
 -        private int count;
 -
 -        Replayer(CFMetaData cfm)
 -        {
 -            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
 -            this.cfm = cfm;
 -        }
 -
 -        @Override
 -        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
 -        {
 -            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 -            try
 -            {
 -                Mutation mutation = Mutation.serializer.deserialize(bufIn,
 -                                                           desc.getMessagingVersion(),
 -                                                           SerializationHelper.Flag.LOCAL);
++        ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
++        CommitLogReader reader = new CommitLogReader();
++        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++        for (File f : toCheck)
++            reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+ 
 -                if (cfm == null || mutation.get(cfm) != null)
 -                    count++;
 -            }
 -            catch (IOException e)
 -            {
 -                // Test fails.
 -                throw new AssertionError(e);
 -            }
 -        }
++        Assert.assertEquals(samples, testHandler.seenMutationCount());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 3956de5,a1999ef..46a3fb0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -67,12 -66,12 +67,12 @@@ public class CommitLogSegmentBackpressu
      @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                                targetClass = "AbstractCommitLogService$1",
                                targetMethod = "run",
-                               targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
 -                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                        @BMRule(name = "Release Semaphore after sync",
                                targetClass = "AbstractCommitLogService$1",
                                targetMethod = "run",
-                               targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
 -                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
                                action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
      public void testCompressedCommitLogBackpressure() throws Throwable
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 267813e,b8f68ed..215ad6c
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -360,10 -339,10 +360,10 @@@ public class CommitLogTes
  
          // "Flush": this won't delete anything
          UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
 -        CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++        CommitLog.instance.sync(true);
 +        CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
 -        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 +        assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
          Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -678,114 -612,6 +678,114 @@@
      }
  
      @Test
 +    public void replaySimple() throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
 +        CommitLog.instance.add(rm2);
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    @Test
 +    public void replayWithDiscard() throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        CommitLogPosition commitLogPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
 +            CommitLogPosition position = CommitLog.instance.add(rm1);
 +
 +            if (i == discardPosition)
 +                commitLogPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
 +        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
 +        File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.replayFiles(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class SimpleCountingReplayer extends CommitLogReplayer
 +    {
 +        private final CommitLogPosition filterPosition;
 +        private final CFMetaData metadata;
 +        int cells;
 +        int skipped;
 +
 +        SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
 +        {
 +            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +            this.metadata = cfm;
 +        }
 +
 +        @SuppressWarnings("resource")
 +        @Override
 +        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
 +        {
 +            // Filter out system writes that could flake the test.
 +            if (!KEYSPACE1.equals(m.getKeyspaceName()))
 +                return;
 +
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +            for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
 +            {
 +                // Only process mutations for the CF's we're testing against, since we can't deterministically predict
 +                // whether or not system keyspaces will be mutated during a test.
 +                if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
 +                {
 +                    for (Row row : partitionUpdate)
 +                        cells += Iterables.size(row.cells());
 +                }
 +            }
 +        }
 +    }
 +
      public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
      {
          CommitLog.instance.resetUnsafe(true);
@@@ -826,7 -652,7 +826,7 @@@
              DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
          }
  
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
++        CommitLog.instance.sync(true);
          System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
          // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
          // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@@ -859,7 -685,7 +859,7 @@@
          for (SSTableReader reader : cfs.getLiveSSTables())
              reader.reloadSSTableMetadata();
  
-         CommitLog.instance.sync();
 -        CommitLog.instance.sync(true, true);
++        CommitLog.instance.sync(true);
          System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
          // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
          // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 7f43378,36973f2..9a22b04
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@@ -35,44 -36,44 +35,44 @@@ import org.apache.cassandra.io.util.Reb
   */
  public class CommitLogTestReplayer extends CommitLogReplayer
  {
 -    public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
 -    {
 -        CommitLog.instance.sync(true, true);
 -
 -        CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
 -        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
 -        replayer.recover(commitLogDir.listFiles());
 -    }
 -
 -    final private Predicate<Mutation> processor;
 +    private final Predicate<Mutation> processor;
  
 -    public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
 +    public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
      {
 -        this(log, ReplayPosition.NONE, processor);
 +        super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +
 +        this.processor = processor;
 +        commitLogReader = new CommitLogTestReader();
      }
  
 -    public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor)
 +    public void examineCommitLog() throws IOException
      {
 -        super(log, discardedPos, null, ReplayFilter.create());
 -        this.processor = processor;
 +        replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
      }
  
 -    @Override
 -    void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
 +    private class CommitLogTestReader extends CommitLogReader
      {
 -        RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 -        Mutation mutation;
 -        try
 -        {
 -            mutation = Mutation.serializer.deserialize(bufIn,
 -                                                           desc.getMessagingVersion(),
 -                                                           SerializationHelper.Flag.LOCAL);
 -            Assert.assertTrue(processor.apply(mutation));
 -        }
 -        catch (IOException e)
 +        @Override
 +        protected void readMutation(CommitLogReadHandler handler,
 +                                    byte[] inputBuffer,
 +                                    int size,
 +                                    CommitLogPosition minPosition,
 +                                    final int entryLocation,
 +                                    final CommitLogDescriptor desc) throws IOException
          {
 -            // Test fails.
 -            throw new AssertionError(e);
 +            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +                Assert.assertTrue(processor.apply(mutation));
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
          }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: More frequent commitlog chained markers

Posted by ja...@apache.org.
More frequent commitlog chained markers

patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f

Branch: refs/heads/trunk
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  10 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../db/commitlog/AbstractCommitLogService.java  |  86 ++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogSegment.java          |  85 ++++++++----
 .../db/commitlog/CompressedSegment.java         |  12 ++
 .../db/commitlog/MemoryMappedSegment.java       |   7 +-
 .../db/commitlog/PeriodicCommitLogService.java  |   2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 128 +++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |   4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   6 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
  * Fix serialized size of DataLimits (CASSANDRA-14057)
  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds. 
+# milliseconds.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
+    public Integer commitlog_marker_period_in_ms = 0;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
         conf.commitlog_sync_period_in_ms = periodMillis;
     }
 
+    public static void setCommitLogMarkerPeriod(int markerPeriod)
+    {
+        conf.commitlog_marker_period_in_ms = markerPeriod;
+    }
+
+    public static int getCommitLogMarkerPeriod()
+    {
+        return conf.commitlog_marker_period_in_ms;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
 public abstract class AbstractCommitLogService
 {
 
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
 
     final CommitLog commitLog;
     private final String name;
-    private final long pollIntervalMillis;
+
+    /**
+     * The duration between syncs to disk.
+     */
+    private final long syncIntervalMillis;
+
+    /**
+     * The duration between updating the chained markers in the the commit log file. This value should be
+     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+     */
+    private final long markerIntervalMillis;
+
+    /**
+     * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+     * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+     * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+     */
+    private volatile boolean syncRequested;
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
 
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+    {
+        this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+    }
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.pollIntervalMillis = pollIntervalMillis;
+        this.syncIntervalMillis = syncIntervalMillis;
+
+        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+        // faster than the sync interval
+        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+            markerIntervalMillis = syncIntervalMillis;
+
+        // apply basic bounds checking on the marker interval
+        if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+        {
+            logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                        markerIntervalMillis, syncIntervalMillis);
+            markerIntervalMillis = syncIntervalMillis;
+        }
+
+        this.markerIntervalMillis = markerIntervalMillis;
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
-        if (pollIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+        if (syncIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+                                                             syncIntervalMillis * 1e-6));
 
         Runnable runnable = new Runnable()
         {
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
                         run = !shutdown;
 
                         // sync and signal
-                        long syncStarted = System.currentTimeMillis();
-                        //This is a target for Byteman in CommitLogSegmentManagerTest
-                        commitLog.sync(shutdown);
-                        lastSyncedAt = syncStarted;
-                        syncComplete.signalAll();
-
+                        long pollStarted = System.currentTimeMillis();
+                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                        {
+                            // in this branch, we want to flush the commit log to disk
+                            commitLog.sync(shutdown, true);
+                            syncRequested = false;
+                            lastSyncedAt = pollStarted;
+                            syncComplete.signalAll();
+                        }
+                        else
+                        {
+                            // in this branch, just update the commit log sync headers
+                            commitLog.sync(false, false);
+                        }
 
                         // sleep any time we have left before the next one is due
                         long now = System.currentTimeMillis();
-                        long sleep = syncStarted + pollIntervalMillis - now;
+                        long sleep = pollStarted + markerIntervalMillis - now;
                         if (sleep < 0)
                         {
                             // if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
                             lagCount++;
                         }
                         syncCount++;
-                        totalSyncDuration += now - syncStarted;
+                        totalSyncDuration += now - pollStarted;
 
                         if (firstLagAt > 0)
                         {
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {
-                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
                         }
                         catch (InterruptedException e)
                         {
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
      */
     public WaitQueue.Signal requestExtraSync()
     {
+        syncRequested = true;
         WaitQueue.Signal signal = syncComplete.register();
         haveWork.release(1);
         return signal;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments)
+    public void sync(boolean syncAllSegments, boolean flush)
     {
         CommitLogSegment current = allocator.allocatingFrom();
         for (CommitLogSegment segment : allocator.getActiveSegments())
         {
             if (!syncAllSegments && segment.id > current.id)
                 return;
-            segment.sync();
+            segment.sync(flush);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
     // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
     private volatile int lastSyncedOffset;
 
+    /**
+     * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+     * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+     */
+    private volatile int lastMarkerOffset;
+
     // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
     // as the segment is being closed.
     // No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
         // write the header
         CommitLogDescriptor.writeHeader(buffer, descriptor);
         endOfBuffer = buffer.capacity();
-        lastSyncedOffset = buffer.position();
+
+        lastSyncedOffset = lastMarkerOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
     }
 
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
     // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
     void discardUnusedTail()
     {
-        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
         // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
         // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
         // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Forces a disk flush for this segment file.
+     * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+     *
+     * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
      */
-    synchronized void sync()
+    synchronized void sync(boolean flush)
     {
-        boolean close = false;
+        assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                    lastMarkerOffset, lastSyncedOffset);
         // check we have more work to do
-        if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+        final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+        final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+        if (!(needToMarkData || hasDataToFlush))
             return;
         // Note: Even if the very first allocation of this sync section failed, we still want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
         // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
-        int startMarker = lastSyncedOffset;
-        // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-        // the point at which we can safely consider records to have been completely written to.
-        int nextMarker = allocate(SYNC_MARKER_SIZE);
-        if (nextMarker < 0)
+        boolean close = false;
+        int startMarker = lastMarkerOffset;
+        int nextMarker, sectionEnd;
+        if (needToMarkData)
         {
-            // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-            discardUnusedTail();
-            close = true;
-
-            // We use the buffer size as the synced position after a close instead of the end of the actual data
-            // to make sure we only close the buffer once.
-            // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-            nextMarker = buffer.capacity();
+            // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+            // the point at which we can safely consider records to have been completely written to.
+            nextMarker = allocate(SYNC_MARKER_SIZE);
+            if (nextMarker < 0)
+            {
+                // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                discardUnusedTail();
+                close = true;
+
+                // We use the buffer size as the synced position after a close instead of the end of the actual data
+                // to make sure we only close the buffer once.
+                // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                nextMarker = buffer.capacity();
+            }
+            // Wait for mutations to complete as well as endOfBuffer to have been written.
+            waitForModifications();
+            sectionEnd = close ? endOfBuffer : nextMarker;
+
+            // Possibly perform compression or encryption and update the chained markers
+            write(startMarker, sectionEnd);
+            lastMarkerOffset = sectionEnd;
+        }
+        else
+        {
+            // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+            // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+            nextMarker = lastMarkerOffset;
+            sectionEnd = nextMarker;
         }
 
-        // Wait for mutations to complete as well as endOfBuffer to have been written.
-        waitForModifications();
-        int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
-        write(startMarker, sectionEnd);
+        if (flush || close)
+        {
+            flush(startMarker, sectionEnd);
+            lastSyncedOffset = lastMarkerOffset = nextMarker;
+        }
 
-        // Signal the sync as complete.
-        lastSyncedOffset = nextMarker;
         if (close)
             internalClose();
         syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
 
     abstract void write(int lastSyncedOffset, int nextMarker);
 
+    abstract void flush(int startMarker, int nextMarker);
+
     public boolean isStillAllocating()
     {
         return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
     synchronized void close()
     {
         discardUnusedTail();
-        sync();
+        sync(true);
         assert buffer == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();
             lastWrittenPos = channel.position();
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force(channel, true);
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
         // write previous sync marker to point to next sync marker
         // we don't chain the crcs here to ensure this method is idempotent if it fails
         writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
 
-        try {
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force((MappedByteBuffer) buffer);
         }
         catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
-        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
     }
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+    @Test
+    @BMRule(name = "force all calls to sync() to not flush to disk",
+    targetClass = "CommitLogSegment",
+    targetMethod = "sync(boolean)",
+    action = "$flush = false")
+    public void replayCommitLogWithoutFlushing() throws IOException
+    {
+        DatabaseDescriptor.setCommitLogSegmentSize(5);
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+        DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        byte[] entropy = new byte[1024];
+        new Random().nextBytes(entropy);
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                           .clustering("bytes")
+                           .add("val", ByteBuffer.wrap(entropy))
+                           .build();
+
+        int samples = 10000;
+        for (int i = 0; i < samples; i++)
+            CommitLog.instance.add(m);
+
+        CommitLog.instance.sync(false, true);
+
+        Replayer replayer = new Replayer(cfs1.metadata);
+        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+        replayer.recover(commitLogDir.listFiles());
+        Assert.assertEquals(samples, replayer.count);
+    }
+
+    private static class Replayer extends CommitLogReplayer
+    {
+        private final CFMetaData cfm;
+        private int count;
+
+        Replayer(CFMetaData cfm)
+        {
+            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+            this.cfm = cfm;
+        }
+
+        @Override
+        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+        {
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+            try
+            {
+                Mutation mutation = Mutation.serializer.deserialize(bufIn,
+                                                           desc.getMessagingVersion(),
+                                                           SerializationHelper.Flag.LOCAL);
+
+                if (cfm == null || mutation.get(cfm) != null)
+                    count++;
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                       @BMRule(name = "Release Semaphore after sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
     public void testCompressedCommitLogBackpressure() throws Throwable
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
         }
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
         // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
         for (SSTableReader reader : cfs.getLiveSSTables())
             reader.reloadSSTableMetadata();
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
         // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
 {
     public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
     {
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
 
         CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
         File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2402acd4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2402acd4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2402acd4

Branch: refs/heads/trunk
Commit: 2402acd47e3bb514981cde742b7330666c564869
Parents: d274c6a c3a1a4f
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:10:08 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:11:21 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 10 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++
 .../AbstractCommitLogSegmentManager.java        |  8 +-
 .../db/commitlog/AbstractCommitLogService.java  | 84 ++++++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 .../db/commitlog/CommitLogSegment.java          | 90 ++++++++++++------
 .../db/commitlog/CompressedSegment.java         |  4 +-
 .../db/commitlog/EncryptedSegment.java          |  3 -
 .../db/commitlog/FileDirectSegment.java         | 14 +++
 .../db/commitlog/MemoryMappedSegment.java       |  4 +
 .../db/commitlog/PeriodicCommitLogService.java  |  2 +-
 .../cassandra/db/commitlog/CDCTestReplayer.java |  2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 98 ++++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |  4 +-
 .../CommitLogSegmentManagerCDCTest.java         |  8 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 10 +-
 .../db/commitlog/CommitLogTestReplayer.java     |  2 +-
 19 files changed, 288 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index ba478e7,3569d36..02cec12
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -358,24 -367,21 +358,24 @@@ counter_cache_save_period: 720
  # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
  # saved_caches_directory: /var/lib/cassandra/saved_caches
  
 -# commitlog_sync may be either "periodic" or "batch." 
 +# commitlog_sync may be either "periodic", "group", or "batch." 
  # 
  # When in batch mode, Cassandra won't ack writes until the commit log
 -# has been fsynced to disk.  It will wait
 -# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
 -# This window should be kept short because the writer threads will
 -# be unable to do extra work while waiting.  (You may need to increase
 -# concurrent_writes for the same reason.)
 +# has been flushed to disk.  Each incoming write will trigger the flush task.
 +# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
 +# almost no value, and is being removed.
  #
 -# commitlog_sync: batch
  # commitlog_sync_batch_window_in_ms: 2
  #
 -# the other option is "periodic" where writes may be acked immediately
 +# group mode is similar to batch mode, where Cassandra will not ack writes
 +# until the commit log has been flushed to disk. The difference is group
 +# mode will wait up to commitlog_sync_group_window_in_ms between flushes.
 +#
 +# commitlog_sync_group_window_in_ms: 1000
 +#
 +# the default option is "periodic" where writes may be acked immediately
  # and the CommitLog is simply synced every commitlog_sync_period_in_ms
- # milliseconds. 
+ # milliseconds.
  commitlog_sync: periodic
  commitlog_sync_period_in_ms: 10000
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1db8217,5fe752e..4fa3bed
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -185,13 -198,9 +185,14 @@@ public class Confi
      public String commitlog_directory;
      public Integer commitlog_total_space_in_mb;
      public CommitLogSync commitlog_sync;
 +
 +    /**
 +     * @deprecated since 4.0 This value was near useless, and we're not using it anymore
 +     */
      public double commitlog_sync_batch_window_in_ms = Double.NaN;
 +    public double commitlog_sync_group_window_in_ms = Double.NaN;
      public int commitlog_sync_period_in_ms;
+     public int commitlog_marker_period_in_ms = 0;
      public int commitlog_segment_size_in_mb = 32;
      public ParameterizedClass commitlog_compression;
      public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3a57139,7c02892..016dbc1
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -94,9 -89,14 +94,15 @@@ public abstract class CommitLogSegmen
      // Everything before this offset has been synced and written.  The SYNC_MARKER_SIZE bytes after
      // each sync are reserved, and point forwards to the next such offset.  The final
      // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
 -    private volatile int lastSyncedOffset;
 +    @VisibleForTesting
 +    volatile int lastSyncedOffset;
  
+     /**
+      * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+      * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+      */
+     private volatile int lastMarkerOffset;
+ 
      // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
      // as the segment is being closed.
      // No need to be volatile as writes are protected by appendOrder barrier.
@@@ -316,34 -319,48 +328,50 @@@
          // succeeded in the previous sync.
          assert buffer != null;  // Only close once.
  
-         int startMarker = lastSyncedOffset;
-         // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-         // the point at which we can safely consider records to have been completely written to.
-         int nextMarker = allocate(SYNC_MARKER_SIZE);
-         if (nextMarker < 0)
+         boolean close = false;
+         int startMarker = lastMarkerOffset;
+         int nextMarker, sectionEnd;
+         if (needToMarkData)
          {
-             // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-             discardUnusedTail();
-             close = true;
- 
-             // We use the buffer size as the synced position after a close instead of the end of the actual data
-             // to make sure we only close the buffer once.
-             // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-             nextMarker = buffer.capacity();
-         }
+             // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+             // the point at which we can safely consider records to have been completely written to.
+             nextMarker = allocate(SYNC_MARKER_SIZE);
+             if (nextMarker < 0)
+             {
+                 // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                 discardUnusedTail();
+                 close = true;
+ 
+                 // We use the buffer size as the synced position after a close instead of the end of the actual data
+                 // to make sure we only close the buffer once.
+                 // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                 nextMarker = buffer.capacity();
+             }
+             // Wait for mutations to complete as well as endOfBuffer to have been written.
+             waitForModifications();
+             sectionEnd = close ? endOfBuffer : nextMarker;
  
-         // Wait for mutations to complete as well as endOfBuffer to have been written.
-         waitForModifications();
-         int sectionEnd = close ? endOfBuffer : nextMarker;
+             // Possibly perform compression or encryption and update the chained markers
+             write(startMarker, sectionEnd);
+             lastMarkerOffset = sectionEnd;
+         }
+         else
+         {
+             // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+             // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+             nextMarker = lastMarkerOffset;
+             sectionEnd = nextMarker;
+         }
  
-         // Possibly perform compression or encryption, writing to file and flush.
-         write(startMarker, sectionEnd);
  
-         if (cdcState == CDCState.CONTAINS)
-             writeCDCIndexFile(descriptor, sectionEnd, close);
+         if (flush || close)
+         {
+             flush(startMarker, sectionEnd);
++            if (cdcState == CDCState.CONTAINS)
++                writeCDCIndexFile(descriptor, sectionEnd, close);
+             lastSyncedOffset = lastMarkerOffset = nextMarker;
+         }
  
-         // Signal the sync as complete.
-         lastSyncedOffset = nextMarker;
          if (close)
              internalClose();
          syncComplete.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 3695da8,0000000..18bc6e0
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@@ -1,76 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +
 +/**
 + * Utility class that flags the replayer as having seen a CDC mutation and calculates offset but doesn't apply mutations
 + */
 +public class CDCTestReplayer extends CommitLogReplayer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CDCTestReplayer.class);
 +
 +    public CDCTestReplayer() throws IOException
 +    {
 +        super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        commitLogReader = new CommitLogTestReader();
 +    }
 +
 +    public void examineCommitLog() throws IOException
 +    {
 +        replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
 +    }
 +
 +    private class CommitLogTestReader extends CommitLogReader
 +    {
 +        @Override
 +        protected void readMutation(CommitLogReadHandler handler,
 +                                    byte[] inputBuffer,
 +                                    int size,
 +                                    CommitLogPosition minPosition,
 +                                    final int entryLocation,
 +                                    final CommitLogDescriptor desc) throws IOException
 +        {
 +            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +                if (mutation.trackedByCDC())
 +                    sawCDCMutation = true;
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,663e7af..be44ec3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,98 +1,98 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Random;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+  * in the commit log segment but do not flush the file to disk.
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+ 
+     @Test
+     @BMRule(name = "force all calls to sync() to not flush to disk",
+     targetClass = "CommitLogSegment",
+     targetMethod = "sync(boolean)",
+     action = "$flush = false")
+     public void replayCommitLogWithoutFlushing() throws IOException
+     {
+         // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
+         DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogSegmentSize(5);
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+         DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         byte[] entropy = new byte[1024];
+         new Random().nextBytes(entropy);
 -        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
++        final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+                            .clustering("bytes")
+                            .add("val", ByteBuffer.wrap(entropy))
+                            .build();
+ 
+         int samples = 10000;
+         for (int i = 0; i < samples; i++)
+             CommitLog.instance.add(m);
+ 
+         CommitLog.instance.sync(false);
+ 
+         ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
+         CommitLogReader reader = new CommitLogReader();
 -        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata.get());
+         for (File f : toCheck)
+             reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+ 
+         Assert.assertEquals(samples, testHandler.seenMutationCount());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 80dfd01,68ce57d..7417cd3
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@@ -159,259 -199,6 +159,259 @@@ public class CommitLogSegmentManagerCDC
          }
      }
  
 +    @Test
 +    public void testCDCIndexFileWriteOnSync() throws IOException
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        int syncOffset = currentSegment.lastSyncedOffset;
 +
 +        // Confirm index file is written
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Read index value and confirm it's == end from last sync
 +        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
 +        String input = in.readLine();
 +        Integer offset = Integer.parseInt(input);
 +        Assert.assertEquals(syncOffset, (long)offset);
 +        in.close();
 +    }
 +
 +    @Test
 +    public void testCompletedFlag() throws IOException
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        DatabaseDescriptor.setCDCSpaceInMB(8);
 +        try
 +        {
 +            for (int i = 0; i < 1000; i++)
 +            {
 +                new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +                .build().apply();
 +            }
 +        }
 +        catch (CDCWriteException ce)
 +        {
 +            // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay
 +        }
 +
 +        CommitLog.instance.forceRecycleAllSegments();
 +
 +        // Confirm index file is written
 +        File cdcIndexFile = initialSegment.getCDCIndexFile();
 +        Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Read index file and confirm second line is COMPLETED
 +        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
 +        String input = in.readLine();
 +        input = in.readLine();
 +        Assert.assertTrue("Expected COMPLETED in index file, got: " + input, input.equals("COMPLETED"));
 +        in.close();
 +    }
 +
 +    @Test
 +    public void testDeleteLinkOnDiscardNoCDC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +
 +        // Confirm that, with no CDC data present, we've hard-linked but have no index file
 +        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Sync and confirm no index written as index is written on flush
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Force a full recycle and confirm hard-link is deleted
 +        CommitLog.instance.forceRecycleAllSegments();
 +        CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
 +        Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc segment: " + linked, Files.exists(linked));
 +    }
 +
 +    @Test
 +    public void testRetainLinkOnDiscardCDC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertFalse("Expected no index file before flush but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +
 +        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
 +        // Confirm that, with CDC data present but not yet flushed, we've hard-linked but have no index file
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +
 +        // Sync and confirm index written as index is written on flush
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertTrue("Expected cdc index file after flush but found none: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Force a full recycle and confirm all files remain
 +        CommitLog.instance.forceRecycleAllSegments();
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertTrue("Expected cdc index file after recycle but found none: " + cdcIndexFile, cdcIndexFile.exists());
 +    }
 +
 +    @Test
 +    public void testReplayLogic() throws IOException
 +    {
 +        // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
 +        String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +
 +        DatabaseDescriptor.setCDCSpaceInMB(8);
 +        TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
 +        try
 +        {
 +            for (int i = 0; i < 1000; i++)
 +            {
 +                new RowUpdateBuilder(ccfm, 0, i)
 +                    .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +                    .build().apply();
 +            }
 +            Assert.fail("Expected CDCWriteException from full CDC but did not receive it.");
 +        }
 +        catch (CDCWriteException e)
 +        {
 +            // pass
 +        }
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        CommitLog.instance.stopUnsafe(false);
 +
 +        // Build up a list of expected index files after replay and then clear out cdc_raw
 +        List<CDCIndexData> oldData = parseCDCIndexData();
 +        for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
 +            FileUtils.deleteWithConfirm(f.getAbsolutePath());
 +
 +        try
 +        {
 +            Assert.assertEquals("Expected 0 files in CDC folder after deletion. ",
 +                                0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
 +        }
 +        finally
 +        {
 +            // If we don't have a started commitlog, assertions will cause the test to hang. I assume it's some assumption
 +            // hang in the shutdown on CQLTester trying to clean up / drop keyspaces / tables and hanging applying
 +            // mutations.
 +            CommitLog.instance.start();
 +            CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
 +        }
 +        CDCTestReplayer replayer = new CDCTestReplayer();
 +        replayer.examineCommitLog();
 +
 +        // Rough sanity check -> should be files there now.
 +        Assert.assertTrue("Expected non-zero number of files in CDC folder after restart.",
 +                          new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0);
 +
 +        // Confirm all the old indexes in old are present and >= the original offset, as we flag the entire segment
 +        // as cdc written on a replay.
 +        List<CDCIndexData> newData = parseCDCIndexData();
 +        for (CDCIndexData cid : oldData)
 +        {
 +            boolean found = false;
 +            for (CDCIndexData ncid : newData)
 +            {
 +                if (cid.fileName.equals(ncid.fileName))
 +                {
 +                    Assert.assertTrue("New CDC index file expected to have >= offset in old.", ncid.offset >= cid.offset);
 +                    found = true;
 +                }
 +            }
 +            if (!found)
 +            {
 +                StringBuilder errorMessage = new StringBuilder();
 +                errorMessage.append(String.format("Missing old CDCIndexData in new set after replay: %s\n", cid));
 +                errorMessage.append("List of CDCIndexData in new set of indexes after replay:\n");
 +                for (CDCIndexData ncid : newData)
 +                    errorMessage.append(String.format("   %s\n", ncid));
 +                Assert.fail(errorMessage.toString());
 +            }
 +        }
 +
 +        // And make sure we don't have new CDC Indexes we don't expect
 +        for (CDCIndexData ncid : newData)
 +        {
 +            boolean found = false;
 +            for (CDCIndexData cid : oldData)
 +            {
 +                if (cid.fileName.equals(ncid.fileName))
 +                    found = true;
 +            }
 +            if (!found)
 +                Assert.fail(String.format("Unexpected new CDCIndexData found after replay: %s\n", ncid));
 +        }
 +    }
 +
 +    private List<CDCIndexData> parseCDCIndexData()
 +    {
 +        List<CDCIndexData> results = new ArrayList<>();
 +        try
 +        {
 +            for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
 +            {
 +                if (f.getName().contains("_cdc.idx"))
 +                    results.add(new CDCIndexData(f));
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            Assert.fail(String.format("Failed to parse CDCIndexData: %s", e.getMessage()));
 +        }
 +        return results;
 +    }
 +
 +    private static class CDCIndexData
 +    {
 +        private final String fileName;
 +        private final int offset;
 +
 +        CDCIndexData(File f) throws IOException
 +        {
 +            String line = "";
 +            try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f))))
 +            {
 +                line = br.readLine();
 +            }
 +            catch (Exception e)
 +            {
 +                throw e;
 +            }
 +            fileName = f.getName();
 +            offset = Integer.parseInt(line);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s,%d", fileName, offset);
 +        }
 +
 +        @Override
 +        public boolean equals(Object other)
 +        {
 +            CDCIndexData cid = (CDCIndexData)other;
 +            return fileName.equals(cid.fileName) && offset == cid.offset;
 +        }
 +    }
 +
      private ByteBuffer randomizeBuffer(int size)
      {
          byte[] toWrap = new byte[size];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 8d04ecc,215ad6c..da895a0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -340,9 -359,9 +340,9 @@@ public abstract class CommitLogTes
          assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          // "Flush": this won't delete anything
 -        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
 +        TableId id1 = rm.getTableIds().iterator().next();
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
 -        CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
 +        CommitLog.instance.discardCompletedSegments(id1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
          assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
@@@ -676,9 -696,9 +676,9 @@@
          cellCount += 1;
          CommitLog.instance.add(rm2);
  
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
  
 -        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
          List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
          Assert.assertFalse(activeSegments.isEmpty());
  
@@@ -713,9 -733,9 +713,9 @@@
              }
          }
  
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
  
 -        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
          List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
          Assert.assertFalse(activeSegments.isEmpty());
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org