You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/14 03:54:31 UTC

[3/6] cassandra git commit: Improve commit log chain marker updating

Improve commit log chain marker updating

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14108


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db788fe8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db788fe8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db788fe8

Branch: refs/heads/trunk
Commit: db788fe860dfd69f06ab97ae35fa67fcf2517b6d
Parents: 0d12169
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Dec 11 16:25:29 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Wed Dec 13 19:51:34 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |  10 -
 .../db/commitlog/AbstractCommitLogService.java  | 250 +++++++++++--------
 .../db/commitlog/PeriodicCommitLogService.java  |   3 +-
 src/java/org/apache/cassandra/utils/Clock.java  |  80 ++++++
 .../commitlog/AbstractCommitLogServiceTest.java | 176 +++++++++++++
 .../commitlog/CommitLogChainedMarkersTest.java  |   1 -
 .../CommitLogSegmentBackpressureTest.java       |   8 +-
 .../cassandra/utils/FreeRunningClock.java       |  46 ++++
 11 files changed, 449 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6bfddcc..ee90a67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Improve commit log chain marker updating (CASSANDRA-14108)
  * Extra range tombstone bound creates double rows (CASSANDRA-14008)
  * Fix SStable ordering by max timestamp in SinglePartitionReadCommand (CASSANDRA-14010)
  * Accept role names containing forward-slash (CASSANDRA-14088)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 71e0b2a..ef7b034 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -304,14 +304,6 @@ counter_cache_save_period: 7200
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
-# Time interval in millis at which we should update the chained markers in the commitlog.
-# This allows more of the commitlog to be replayed from the mmapped file
-# if the cassandra process crashes; this does not help in durability for surviving a host fail.
-# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
-# and only applies to periodic mode when not using commitlog compression or encryption.
-# commitlog_marker_period_in_ms: 100
-
-
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0796183..64d41bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,7 +192,6 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
-    public Integer commitlog_marker_period_in_ms = 0;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 169ed3d..efc71ef 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,16 +1519,6 @@ public class DatabaseDescriptor
         conf.commitlog_sync_period_in_ms = periodMillis;
     }
 
-    public static void setCommitLogMarkerPeriod(int markerPeriod)
-    {
-        conf.commitlog_marker_period_in_ms = markerPeriod;
-    }
-
-    public static int getCommitLogMarkerPeriod()
-    {
-        return conf.commitlog_marker_period_in_ms;
-    }
-
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 8a03b2f..829530d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -31,6 +31,11 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class AbstractCommitLogService
 {
+    /**
+     * When in {@link Config.CommitLogSync#periodic} mode, the default number of milliseconds to wait between updating
+     * the commit log chained markers.
+     */
+    static final long DEFAULT_MARKER_INTERVAL_MILLIS = 100;
 
     private Thread thread;
     private volatile boolean shutdown = false;
@@ -52,13 +57,13 @@ public abstract class AbstractCommitLogService
     /**
      * The duration between syncs to disk.
      */
-    private final long syncIntervalMillis;
+    final long syncIntervalMillis;
 
     /**
      * The duration between updating the chained markers in the the commit log file. This value should be
      * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
      */
-    private final long markerIntervalMillis;
+    final long markerIntervalMillis;
 
     /**
      * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
@@ -75,9 +80,9 @@ public abstract class AbstractCommitLogService
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis)
     {
-        this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+        this (commitLog, name, syncIntervalMillis, false);
     }
 
     /**
@@ -85,138 +90,163 @@ public abstract class AbstractCommitLogService
      * Batch or Periodic contract.
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     *
+     * @param markHeadersFaster true if the chained markers should be updated more frequently than on the disk sync bounds.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis, boolean markHeadersFaster)
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.syncIntervalMillis = syncIntervalMillis;
 
-        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
-        // faster than the sync interval
-        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
-            markerIntervalMillis = syncIntervalMillis;
+        if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS)
+        {
+            markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS;
+            long modulo = syncIntervalMillis % markerIntervalMillis;
+            if (modulo != 0)
+            {
+                // quantize syncIntervalMillis to a multiple of markerIntervalMillis
+                syncIntervalMillis -= modulo;
 
-        // apply basic bounds checking on the marker interval
-        if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+                if (modulo >= markerIntervalMillis / 2)
+                    syncIntervalMillis += markerIntervalMillis;
+            }
+            logger.debug("Will update the commitlog markers every {}ms and flush every {}ms", markerIntervalMillis, syncIntervalMillis);
+        }
+        else
         {
-            logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
-                        markerIntervalMillis, syncIntervalMillis);
             markerIntervalMillis = syncIntervalMillis;
         }
 
-        this.markerIntervalMillis = markerIntervalMillis;
+        assert syncIntervalMillis % markerIntervalMillis == 0;
+        this.syncIntervalMillis = syncIntervalMillis;
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
         if (syncIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
-                                                             syncIntervalMillis * 1e-6));
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms",
+                                                             syncIntervalMillis));
+        shutdown = false;
+        Runnable runnable = new SyncRunnable(new Clock());
+        thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
+        thread.start();
+    }
 
-        Runnable runnable = new Runnable()
+    class SyncRunnable implements Runnable
+    {
+        final Clock clock;
+        long firstLagAt = 0;
+        long totalSyncDuration = 0; // total time spent syncing since firstLagAt
+        long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
+        int lagCount = 0;
+        int syncCount = 0;
+
+        SyncRunnable(Clock clock)
         {
-            public void run()
+            this.clock = clock;
+        }
+
+        public void run()
+        {
+            while (true)
             {
-                long firstLagAt = 0;
-                long totalSyncDuration = 0; // total time spent syncing since firstLagAt
-                long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
-                int lagCount = 0;
-                int syncCount = 0;
-
-                boolean run = true;
-                while (run)
+                if (!sync())
+                    break;
+            }
+        }
+
+        boolean sync()
+        {
+            try
+            {
+                // always run once after shutdown signalled
+                boolean run = !shutdown;
+
+                // sync and signal
+                long pollStarted = clock.currentTimeMillis();
+                if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
                 {
-                    try
-                    {
-                        // always run once after shutdown signalled
-                        run = !shutdown;
-
-                        // sync and signal
-                        long pollStarted = System.currentTimeMillis();
-                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
-                        {
-                            // in this branch, we want to flush the commit log to disk
-                            commitLog.sync(shutdown, true);
-                            syncRequested = false;
-                            lastSyncedAt = pollStarted;
-                            syncComplete.signalAll();
-                        }
-                        else
-                        {
-                            // in this branch, just update the commit log sync headers
-                            commitLog.sync(false, false);
-                        }
-
-                        // sleep any time we have left before the next one is due
-                        long now = System.currentTimeMillis();
-                        long sleep = pollStarted + markerIntervalMillis - now;
-                        if (sleep < 0)
-                        {
-                            // if we have lagged noticeably, update our lag counter
-                            if (firstLagAt == 0)
-                            {
-                                firstLagAt = now;
-                                totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                            }
-                            syncExceededIntervalBy -= sleep;
-                            lagCount++;
-                        }
-                        syncCount++;
-                        totalSyncDuration += now - pollStarted;
-
-                        if (firstLagAt > 0)
-                        {
-                            //Only reset the lag tracking if it actually logged this time
-                            boolean logged = NoSpamLogger.log(
-                                    logger,
-                                    NoSpamLogger.Level.WARN,
-                                    5,
-                                    TimeUnit.MINUTES,
-                                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                                                      syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
-                           if (logged)
-                               firstLagAt = 0;
-                        }
-
-                        // if we have lagged this round, we probably have work to do already so we don't sleep
-                        if (sleep < 0 || !run)
-                            continue;
-
-                        try
-                        {
-                            haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
-                            haveWork.drainPermits();
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError();
-                        }
-                    }
-                    catch (Throwable t)
+                    // in this branch, we want to flush the commit log to disk
+                    commitLog.sync(shutdown, true);
+                    syncRequested = false;
+                    lastSyncedAt = pollStarted;
+                    syncComplete.signalAll();
+                }
+                else
+                {
+                    // in this branch, just update the commit log sync headers
+                    commitLog.sync(false, false);
+                }
+
+                // sleep any time we have left before the next one is due
+                long now = clock.currentTimeMillis();
+                long sleep = pollStarted + markerIntervalMillis - now;
+                if (sleep < 0)
+                {
+                    // if we have lagged noticeably, update our lag counter
+                    if (firstLagAt == 0)
                     {
-                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
-                            break;
-
-                        // sleep for full poll-interval after an error, so we don't spam the log file
-                        try
-                        {
-                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError();
-                        }
+                        firstLagAt = now;
+                        totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
                     }
+                    syncExceededIntervalBy -= sleep;
+                    lagCount++;
+                }
+                syncCount++;
+                totalSyncDuration += now - pollStarted;
+
+                if (firstLagAt > 0)
+                {
+                    //Only reset the lag tracking if it actually logged this time
+                    boolean logged = NoSpamLogger.log(
+                    logger,
+                    NoSpamLogger.Level.WARN,
+                    5,
+                    TimeUnit.MINUTES,
+                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+                    syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+                    if (logged)
+                        firstLagAt = 0;
+                }
+
+                if (!run)
+                    return false;
+
+                // if we have lagged this round, we probably have work to do already so we don't sleep
+                if (sleep < 0)
+                    return true;
+
+                try
+                {
+                    haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+                    haveWork.drainPermits();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError();
                 }
             }
-        };
+            catch (Throwable t)
+            {
+                if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+                    return false;
 
-        thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
-        thread.start();
+                // sleep for full poll-interval after an error, so we don't spam the log file
+                try
+                {
+                    haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError();
+                }
+            }
+            return true;
+        }
     }
 
+
     /**
      * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 76419b7..7a09de0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,8 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
-        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(),
+              !commitLog.configuration.useCompression());
     }
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/utils/Clock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java
new file mode 100644
index 0000000..eb9822c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around time related functions that are either implemented by using the default JVM calls
+ * or by using a custom implementation for testing purposes.
+ *
+ * See {@link #instance} for how to use a custom implementation.
+ *
+ * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an
+ * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}.
+ */
+public class Clock
+{
+    private static final Logger logger = LoggerFactory.getLogger(Clock.class);
+
+    /**
+     * Static singleton object that will be instanciated by default with a system clock
+     * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a
+     * different implementation instead.
+     */
+    public static Clock instance;
+
+    static
+    {
+        String sclock = System.getProperty("cassandra.clock");
+        if (sclock == null)
+        {
+            instance = new Clock();
+        }
+        else
+        {
+            try
+            {
+                logger.debug("Using custom clock implementation: {}", sclock);
+                instance = (Clock) Class.forName(sclock).newInstance();
+            }
+            catch (Exception e)
+            {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * @see System#nanoTime()
+     */
+    public long nanoTime()
+    {
+        return System.nanoTime();
+    }
+
+    /**
+     * @see System#currentTimeMillis()
+     */
+    public long currentTimeMillis()
+    {
+        return System.currentTimeMillis();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
new file mode 100644
index 0000000..5a46e5f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FreeRunningClock;
+
+import static org.apache.cassandra.db.commitlog.AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS;
+
+public class AbstractCommitLogServiceTest
+{
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+    }
+
+    @Test
+    public void testConstructorSyncIsQuantized()
+    {
+        long syncTimeMillis = 10 * 1000;
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+        Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
+    }
+
+    @Test
+    public void testConstructorSyncEqualsMarkerDefault()
+    {
+        long syncTimeMillis = 100;
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+        Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
+        Assert.assertEquals(commitLogService.markerIntervalMillis, commitLogService.syncIntervalMillis);
+    }
+
+    @Test
+    public void testConstructorSyncShouldRoundUp()
+    {
+        long syncTimeMillis = 151;
+        long expectedMillis = 200;
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+        Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
+    }
+
+    @Test
+    public void testConstructorSyncShouldRoundDown()
+    {
+        long syncTimeMillis = 121;
+        long expectedMillis = 100;
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+        Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
+    }
+
+    @Test
+    public void testConstructorSyncTinyValue()
+    {
+        long syncTimeMillis = 10;
+        long expectedNanos = syncTimeMillis;
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        Assert.assertEquals(expectedNanos, commitLogService.markerIntervalMillis);
+        Assert.assertEquals(expectedNanos, commitLogService.syncIntervalMillis);
+    }
+
+    private static class FakeCommitLogService extends AbstractCommitLogService
+    {
+        FakeCommitLogService(long syncIntervalMillis)
+        {
+            super(new FakeCommitLog(), "This is not a real commit log", syncIntervalMillis, true);
+            lastSyncedAt = 0;
+        }
+
+        @Override
+        void start()
+        {
+            // nop
+        }
+
+        protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+        {
+            // nop
+        }
+    }
+
+    @Test
+    public void testSync()
+    {
+        long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
+        FreeRunningClock clock = new FreeRunningClock();
+        FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+        AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+        FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
+
+        // at time 0
+        Assert.assertTrue(syncRunnable.sync());
+        Assert.assertEquals(1, commitLog.markCount.get());
+        Assert.assertEquals(0, commitLog.syncCount.get());
+
+        // at time DEFAULT_MARKER_INTERVAL_MILLIS
+        clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        Assert.assertTrue(syncRunnable.sync());
+        Assert.assertEquals(2, commitLog.markCount.get());
+        Assert.assertEquals(0, commitLog.syncCount.get());
+
+        // at time DEFAULT_MARKER_INTERVAL_MILLIS * 2
+        clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        Assert.assertTrue(syncRunnable.sync());
+        Assert.assertEquals(2, commitLog.markCount.get());
+        Assert.assertEquals(1, commitLog.syncCount.get());
+
+        // at time DEFAULT_MARKER_INTERVAL_MILLIS * 3, but with shutdown!
+        clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        commitLogService.shutdown();
+        Assert.assertFalse(syncRunnable.sync());
+        Assert.assertEquals(2, commitLog.markCount.get());
+        Assert.assertEquals(2, commitLog.syncCount.get());
+    }
+
+    private static class FakeCommitLog extends CommitLog
+    {
+        private final AtomicInteger markCount = new AtomicInteger();
+        private final AtomicInteger syncCount = new AtomicInteger();
+
+        FakeCommitLog()
+        {
+            super(DatabaseDescriptor.getCommitLogLocation(), null);
+        }
+
+        @Override
+        CommitLog start()
+        {
+            // this is a bit dicey. we need to start the allocator, but starting the parent's executor will muck things
+            // up as it is pointing to a different executor service, not the fake one in this test class.
+            allocator.start();
+            return this;
+        }
+
+        @Override
+        public void sync(boolean syncAllSegments, boolean flush)
+        {
+            if (flush)
+                syncCount.incrementAndGet();
+            else
+                markCount.incrementAndGet();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index e2b9f72..b73275b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -65,7 +65,6 @@ public class CommitLogChainedMarkersTest
         DatabaseDescriptor.setCommitLogSegmentSize(5);
         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
         DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
-        DatabaseDescriptor.setCommitLogMarkerPeriod(1);
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index a1999ef..c615880 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -64,13 +64,13 @@ public class CommitLogSegmentBackpressureTest
 
     @Test
     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
-                              targetClass = "AbstractCommitLogService$1",
-                              targetMethod = "run",
+                              targetClass = "AbstractCommitLogService$SyncRunnable",
+                              targetMethod = "sync",
                               targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                       @BMRule(name = "Release Semaphore after sync",
-                              targetClass = "AbstractCommitLogService$1",
-                              targetMethod = "run",
+                              targetClass = "AbstractCommitLogService$SyncRunnable",
+                              targetMethod = "sync",
                               targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
     public void testCompressedCommitLogBackpressure() throws Throwable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
new file mode 100644
index 0000000..83c8db7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A freely adjustable clock that can be used for unit testing. See {@link Clock#instance} how to
+ * enable this class.
+ */
+public class FreeRunningClock extends Clock
+{
+    private long nanoTime = 0;
+
+    @Override
+    public long nanoTime()
+    {
+        return nanoTime;
+    }
+
+    @Override
+    public long currentTimeMillis()
+    {
+        return TimeUnit.NANOSECONDS.toMillis(nanoTime());
+    }
+
+    public void advance(long time, TimeUnit unit)
+    {
+        nanoTime += unit.toNanos(time);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org