You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/14 03:54:31 UTC
[3/6] cassandra git commit: Improve commit log chain marker updating
Improve commit log chain marker updating
patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-14108
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db788fe8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db788fe8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db788fe8
Branch: refs/heads/trunk
Commit: db788fe860dfd69f06ab97ae35fa67fcf2517b6d
Parents: 0d12169
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Dec 11 16:25:29 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Wed Dec 13 19:51:34 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 8 -
.../org/apache/cassandra/config/Config.java | 1 -
.../cassandra/config/DatabaseDescriptor.java | 10 -
.../db/commitlog/AbstractCommitLogService.java | 250 +++++++++++--------
.../db/commitlog/PeriodicCommitLogService.java | 3 +-
src/java/org/apache/cassandra/utils/Clock.java | 80 ++++++
.../commitlog/AbstractCommitLogServiceTest.java | 176 +++++++++++++
.../commitlog/CommitLogChainedMarkersTest.java | 1 -
.../CommitLogSegmentBackpressureTest.java | 8 +-
.../cassandra/utils/FreeRunningClock.java | 46 ++++
11 files changed, 449 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6bfddcc..ee90a67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Improve commit log chain marker updating (CASSANDRA-14108)
* Extra range tombstone bound creates double rows (CASSANDRA-14008)
* Fix SStable ordering by max timestamp in SinglePartitionReadCommand (CASSANDRA-14010)
* Accept role names containing forward-slash (CASSANDRA-14088)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 71e0b2a..ef7b034 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -304,14 +304,6 @@ counter_cache_save_period: 7200
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
-# Time interval in millis at which we should update the chained markers in the commitlog.
-# This allows more of the commitlog to be replayed from the mmapped file
-# if the cassandra process crashes; this does not help in durability for surviving a host fail.
-# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
-# and only applies to periodic mode when not using commitlog compression or encryption.
-# commitlog_marker_period_in_ms: 100
-
-
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0796183..64d41bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,7 +192,6 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
- public Integer commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 169ed3d..efc71ef 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,16 +1519,6 @@ public class DatabaseDescriptor
conf.commitlog_sync_period_in_ms = periodMillis;
}
- public static void setCommitLogMarkerPeriod(int markerPeriod)
- {
- conf.commitlog_marker_period_in_ms = markerPeriod;
- }
-
- public static int getCommitLogMarkerPeriod()
- {
- return conf.commitlog_marker_period_in_ms;
- }
-
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 8a03b2f..829530d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.db.commitlog;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -31,6 +31,11 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractCommitLogService
{
+ /**
+ * When in {@link Config.CommitLogSync#periodic} mode, the default number of milliseconds to wait between updating
+ * the commit log chained markers.
+ */
+ static final long DEFAULT_MARKER_INTERVAL_MILLIS = 100;
private Thread thread;
private volatile boolean shutdown = false;
@@ -52,13 +57,13 @@ public abstract class AbstractCommitLogService
/**
* The duration between syncs to disk.
*/
- private final long syncIntervalMillis;
+ final long syncIntervalMillis;
/**
* The duration between updating the chained markers in the the commit log file. This value should be
* 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
*/
- private final long markerIntervalMillis;
+ final long markerIntervalMillis;
/**
* A flag that callers outside of the sync thread can use to signal they want the commitlog segments
@@ -75,9 +80,9 @@ public abstract class AbstractCommitLogService
*
* Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
*/
- AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+ AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis)
{
- this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+ this (commitLog, name, syncIntervalMillis, false);
}
/**
@@ -85,138 +90,163 @@ public abstract class AbstractCommitLogService
* Batch or Periodic contract.
*
* Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ *
+ * @param markHeadersFaster true if the chained markers should be updated more frequently than on the disk sync bounds.
*/
- AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
+ AbstractCommitLogService(final CommitLog commitLog, final String name, long syncIntervalMillis, boolean markHeadersFaster)
{
this.commitLog = commitLog;
this.name = name;
- this.syncIntervalMillis = syncIntervalMillis;
- // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
- // faster than the sync interval
- if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
- markerIntervalMillis = syncIntervalMillis;
+ if (markHeadersFaster && syncIntervalMillis > DEFAULT_MARKER_INTERVAL_MILLIS)
+ {
+ markerIntervalMillis = DEFAULT_MARKER_INTERVAL_MILLIS;
+ long modulo = syncIntervalMillis % markerIntervalMillis;
+ if (modulo != 0)
+ {
+ // quantize syncIntervalMillis to a multiple of markerIntervalMillis
+ syncIntervalMillis -= modulo;
- // apply basic bounds checking on the marker interval
- if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ if (modulo >= markerIntervalMillis / 2)
+ syncIntervalMillis += markerIntervalMillis;
+ }
+ logger.debug("Will update the commitlog markers every {}ms and flush every {}ms", markerIntervalMillis, syncIntervalMillis);
+ }
+ else
{
- logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
- markerIntervalMillis, syncIntervalMillis);
markerIntervalMillis = syncIntervalMillis;
}
- this.markerIntervalMillis = markerIntervalMillis;
+ assert syncIntervalMillis % markerIntervalMillis == 0;
+ this.syncIntervalMillis = syncIntervalMillis;
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
if (syncIntervalMillis < 1)
- throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
- syncIntervalMillis * 1e-6));
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms",
+ syncIntervalMillis));
+ shutdown = false;
+ Runnable runnable = new SyncRunnable(new Clock());
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
+ thread.start();
+ }
- Runnable runnable = new Runnable()
+ class SyncRunnable implements Runnable
+ {
+ final Clock clock;
+ long firstLagAt = 0;
+ long totalSyncDuration = 0; // total time spent syncing since firstLagAt
+ long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
+ int lagCount = 0;
+ int syncCount = 0;
+
+ SyncRunnable(Clock clock)
{
- public void run()
+ this.clock = clock;
+ }
+
+ public void run()
+ {
+ while (true)
{
- long firstLagAt = 0;
- long totalSyncDuration = 0; // total time spent syncing since firstLagAt
- long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt
- int lagCount = 0;
- int syncCount = 0;
-
- boolean run = true;
- while (run)
+ if (!sync())
+ break;
+ }
+ }
+
+ boolean sync()
+ {
+ try
+ {
+ // always run once after shutdown signalled
+ boolean run = !shutdown;
+
+ // sync and signal
+ long pollStarted = clock.currentTimeMillis();
+ if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
{
- try
- {
- // always run once after shutdown signalled
- run = !shutdown;
-
- // sync and signal
- long pollStarted = System.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
- {
- // in this branch, we want to flush the commit log to disk
- commitLog.sync(shutdown, true);
- syncRequested = false;
- lastSyncedAt = pollStarted;
- syncComplete.signalAll();
- }
- else
- {
- // in this branch, just update the commit log sync headers
- commitLog.sync(false, false);
- }
-
- // sleep any time we have left before the next one is due
- long now = System.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy -= sleep;
- lagCount++;
- }
- syncCount++;
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
- if (logged)
- firstLagAt = 0;
- }
-
- // if we have lagged this round, we probably have work to do already so we don't sleep
- if (sleep < 0 || !run)
- continue;
-
- try
- {
- haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
- haveWork.drainPermits();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
- }
- catch (Throwable t)
+ // in this branch, we want to flush the commit log to disk
+ commitLog.sync(shutdown, true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
+ commitLog.sync(false, false);
+ }
+
+ // sleep any time we have left before the next one is due
+ long now = clock.currentTimeMillis();
+ long sleep = pollStarted + markerIntervalMillis - now;
+ if (sleep < 0)
+ {
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
{
- if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
- break;
-
- // sleep for full poll-interval after an error, so we don't spam the log file
- try
- {
- haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ firstLagAt = now;
+ totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
}
+ syncExceededIntervalBy -= sleep;
+ lagCount++;
+ }
+ syncCount++;
+ totalSyncDuration += now - pollStarted;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
+ boolean logged = NoSpamLogger.log(
+ logger,
+ NoSpamLogger.Level.WARN,
+ 5,
+ TimeUnit.MINUTES,
+ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+ syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+
+ if (!run)
+ return false;
+
+ // if we have lagged this round, we probably have work to do already so we don't sleep
+ if (sleep < 0)
+ return true;
+
+ try
+ {
+ haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+ haveWork.drainPermits();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
}
}
- };
+ catch (Throwable t)
+ {
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ return false;
- thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
- thread.start();
+ // sleep for full poll-interval after an error, so we don't spam the log file
+ try
+ {
+ haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ return true;
+ }
}
+
/**
* Block for @param alloc to be sync'd as necessary, and handle bookkeeping
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 76419b7..7a09de0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,8 @@ class PeriodicCommitLogService extends AbstractCommitLogService
public PeriodicCommitLogService(final CommitLog commitLog)
{
- super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
+ super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(),
+ !commitLog.configuration.useCompression());
}
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/src/java/org/apache/cassandra/utils/Clock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java
new file mode 100644
index 0000000..eb9822c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around time related functions that are either implemented by using the default JVM calls
+ * or by using a custom implementation for testing purposes.
+ *
+ * See {@link #instance} for how to use a custom implementation.
+ *
+ * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an
+ * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}.
+ */
+public class Clock
+{
+ private static final Logger logger = LoggerFactory.getLogger(Clock.class);
+
+ /**
+ * Static singleton object that will be instanciated by default with a system clock
+ * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a
+ * different implementation instead.
+ */
+ public static Clock instance;
+
+ static
+ {
+ String sclock = System.getProperty("cassandra.clock");
+ if (sclock == null)
+ {
+ instance = new Clock();
+ }
+ else
+ {
+ try
+ {
+ logger.debug("Using custom clock implementation: {}", sclock);
+ instance = (Clock) Class.forName(sclock).newInstance();
+ }
+ catch (Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * @see System#nanoTime()
+ */
+ public long nanoTime()
+ {
+ return System.nanoTime();
+ }
+
+ /**
+ * @see System#currentTimeMillis()
+ */
+ public long currentTimeMillis()
+ {
+ return System.currentTimeMillis();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
new file mode 100644
index 0000000..5a46e5f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FreeRunningClock;
+
+import static org.apache.cassandra.db.commitlog.AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS;
+
+public class AbstractCommitLogServiceTest
+{
+ @BeforeClass
+ public static void before()
+ {
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+ }
+
+ @Test
+ public void testConstructorSyncIsQuantized()
+ {
+ long syncTimeMillis = 10 * 1000;
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+ Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
+ }
+
+ @Test
+ public void testConstructorSyncEqualsMarkerDefault()
+ {
+ long syncTimeMillis = 100;
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+ Assert.assertEquals(syncTimeMillis, commitLogService.syncIntervalMillis);
+ Assert.assertEquals(commitLogService.markerIntervalMillis, commitLogService.syncIntervalMillis);
+ }
+
+ @Test
+ public void testConstructorSyncShouldRoundUp()
+ {
+ long syncTimeMillis = 151;
+ long expectedMillis = 200;
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+ Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
+ }
+
+ @Test
+ public void testConstructorSyncShouldRoundDown()
+ {
+ long syncTimeMillis = 121;
+ long expectedMillis = 100;
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ Assert.assertEquals(DEFAULT_MARKER_INTERVAL_MILLIS, commitLogService.markerIntervalMillis);
+ Assert.assertEquals(expectedMillis, commitLogService.syncIntervalMillis);
+ }
+
+ @Test
+ public void testConstructorSyncTinyValue()
+ {
+ long syncTimeMillis = 10;
+ long expectedNanos = syncTimeMillis;
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ Assert.assertEquals(expectedNanos, commitLogService.markerIntervalMillis);
+ Assert.assertEquals(expectedNanos, commitLogService.syncIntervalMillis);
+ }
+
+ private static class FakeCommitLogService extends AbstractCommitLogService
+ {
+ FakeCommitLogService(long syncIntervalMillis)
+ {
+ super(new FakeCommitLog(), "This is not a real commit log", syncIntervalMillis, true);
+ lastSyncedAt = 0;
+ }
+
+ @Override
+ void start()
+ {
+ // nop
+ }
+
+ protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+ {
+ // nop
+ }
+ }
+
+ @Test
+ public void testSync()
+ {
+ long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
+ FreeRunningClock clock = new FreeRunningClock();
+ FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
+ AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+ FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
+
+ // at time 0
+ Assert.assertTrue(syncRunnable.sync());
+ Assert.assertEquals(1, commitLog.markCount.get());
+ Assert.assertEquals(0, commitLog.syncCount.get());
+
+ // at time DEFAULT_MARKER_INTERVAL_MILLIS
+ clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(syncRunnable.sync());
+ Assert.assertEquals(2, commitLog.markCount.get());
+ Assert.assertEquals(0, commitLog.syncCount.get());
+
+ // at time DEFAULT_MARKER_INTERVAL_MILLIS * 2
+ clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(syncRunnable.sync());
+ Assert.assertEquals(2, commitLog.markCount.get());
+ Assert.assertEquals(1, commitLog.syncCount.get());
+
+ // at time DEFAULT_MARKER_INTERVAL_MILLIS * 3, but with shutdown!
+ clock.advance(DEFAULT_MARKER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ commitLogService.shutdown();
+ Assert.assertFalse(syncRunnable.sync());
+ Assert.assertEquals(2, commitLog.markCount.get());
+ Assert.assertEquals(2, commitLog.syncCount.get());
+ }
+
+ private static class FakeCommitLog extends CommitLog
+ {
+ private final AtomicInteger markCount = new AtomicInteger();
+ private final AtomicInteger syncCount = new AtomicInteger();
+
+ FakeCommitLog()
+ {
+ super(DatabaseDescriptor.getCommitLogLocation(), null);
+ }
+
+ @Override
+ CommitLog start()
+ {
+ // this is a bit dicey. we need to start the allocator, but starting the parent's executor will muck things
+ // up as it is pointing to a different executor service, not the fake one in this test class.
+ allocator.start();
+ return this;
+ }
+
+ @Override
+ public void sync(boolean syncAllSegments, boolean flush)
+ {
+ if (flush)
+ syncCount.incrementAndGet();
+ else
+ markCount.incrementAndGet();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index e2b9f72..b73275b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -65,7 +65,6 @@ public class CommitLogChainedMarkersTest
DatabaseDescriptor.setCommitLogSegmentSize(5);
DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
- DatabaseDescriptor.setCommitLogMarkerPeriod(1);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index a1999ef..c615880 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -64,13 +64,13 @@ public class CommitLogSegmentBackpressureTest
@Test
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
- targetClass = "AbstractCommitLogService$1",
- targetMethod = "run",
+ targetClass = "AbstractCommitLogService$SyncRunnable",
+ targetMethod = "sync",
targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
- targetClass = "AbstractCommitLogService$1",
- targetMethod = "run",
+ targetClass = "AbstractCommitLogService$SyncRunnable",
+ targetMethod = "sync",
targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db788fe8/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/FreeRunningClock.java b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
new file mode 100644
index 0000000..83c8db7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/FreeRunningClock.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A freely adjustable clock that can be used for unit testing. See {@link Clock#instance} how to
+ * enable this class.
+ */
+public class FreeRunningClock extends Clock
+{
+ private long nanoTime = 0;
+
+ @Override
+ public long nanoTime()
+ {
+ return nanoTime;
+ }
+
+ @Override
+ public long currentTimeMillis()
+ {
+ return TimeUnit.NANOSECONDS.toMillis(nanoTime());
+ }
+
+ public void advance(long time, TimeUnit unit)
+ {
+ nanoTime += unit.toNanos(time);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org