You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/06/05 20:55:47 UTC
[1/6] cassandra git commit: Fix regression of lagging commitlog flush
log message
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 38096da25 -> 214a3abfc
refs/heads/cassandra-3.11 b92d90dc1 -> 77a12053b
refs/heads/trunk 5d8767765 -> 843a5fdf2
Fix regression of lagging commitlog flush log message
patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf
Branch: refs/heads/cassandra-3.0
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 85 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
public abstract class AbstractCommitLogService
{
/**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
// sync and signal
long pollStarted = clock.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+ if (flushToDisk)
{
// in this branch, we want to flush the commit log to disk
syncRequested = false;
commitLog.sync(shutdown, true);
lastSyncedAt = pollStarted;
syncComplete.signalAll();
+ syncCount++;
}
else
{
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
commitLog.sync(false, false);
}
- // sleep any time we have left before the next one is due
long now = clock.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy -= sleep;
- lagCount++;
- }
- syncCount++;
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
- if (logged)
- firstLagAt = 0;
- }
+ if (flushToDisk)
+ maybeLogFlushLag(pollStarted, now);
if (!run)
return false;
// if we have lagged this round, we probably have work to do already so we don't sleep
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
return true;
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
}
return true;
}
+
+ /**
+ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+ */
+ @VisibleForTesting
+ boolean maybeLogFlushLag(long pollStarted, long now)
+ {
+ long flushDuration = now - pollStarted;
+ totalSyncDuration += flushDuration;
+
+ // this is the timestamp by which we should have completed the flush
+ long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+ if (maxFlushTimestamp > now)
+ return false;
+
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
+ {
+ firstLagAt = now;
+ syncExceededIntervalBy = lagCount = 0;
+ syncCount = 1;
+ totalSyncDuration = flushDuration;
+ }
+ syncExceededIntervalBy += now - maxFlushTimestamp;
+ lagCount++;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
+ boolean logged = NoSpamLogger.log(
+ logger,
+ NoSpamLogger.Level.WARN,
+ 5,
+ TimeUnit.MINUTES,
+ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+ syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ long getTotalSyncDuration()
+ {
+ return totalSyncDuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FreeRunningClock;
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
FreeRunningClock clock = new FreeRunningClock();
FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
- AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+ SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
// at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
markCount.incrementAndGet();
}
}
+
+ @Test
+ public void maybeLogFlushLag_MustLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ @Test
+ public void maybeLogFlushLag_NoLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ /**
+ * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+ */
+ @Test
+ public void maybeLogFlushLag_MultipleOperations()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+
+ int runCount = 12;
+ for (int i = 1; i <= runCount; i++)
+ {
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+ }
+
+ now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/6] cassandra git commit: Fix regression of lagging commitlog flush
log message
Posted by ja...@apache.org.
Fix regression of lagging commitlog flush log message
patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf
Branch: refs/heads/trunk
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 85 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
public abstract class AbstractCommitLogService
{
/**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
// sync and signal
long pollStarted = clock.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+ if (flushToDisk)
{
// in this branch, we want to flush the commit log to disk
syncRequested = false;
commitLog.sync(shutdown, true);
lastSyncedAt = pollStarted;
syncComplete.signalAll();
+ syncCount++;
}
else
{
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
commitLog.sync(false, false);
}
- // sleep any time we have left before the next one is due
long now = clock.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy -= sleep;
- lagCount++;
- }
- syncCount++;
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
- if (logged)
- firstLagAt = 0;
- }
+ if (flushToDisk)
+ maybeLogFlushLag(pollStarted, now);
if (!run)
return false;
// if we have lagged this round, we probably have work to do already so we don't sleep
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
return true;
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
}
return true;
}
+
+ /**
+ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+ */
+ @VisibleForTesting
+ boolean maybeLogFlushLag(long pollStarted, long now)
+ {
+ long flushDuration = now - pollStarted;
+ totalSyncDuration += flushDuration;
+
+ // this is the timestamp by which we should have completed the flush
+ long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+ if (maxFlushTimestamp > now)
+ return false;
+
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
+ {
+ firstLagAt = now;
+ syncExceededIntervalBy = lagCount = 0;
+ syncCount = 1;
+ totalSyncDuration = flushDuration;
+ }
+ syncExceededIntervalBy += now - maxFlushTimestamp;
+ lagCount++;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
+ boolean logged = NoSpamLogger.log(
+ logger,
+ NoSpamLogger.Level.WARN,
+ 5,
+ TimeUnit.MINUTES,
+ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+ syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ long getTotalSyncDuration()
+ {
+ return totalSyncDuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FreeRunningClock;
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
FreeRunningClock clock = new FreeRunningClock();
FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
- AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+ SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
// at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
markCount.incrementAndGet();
}
}
+
+ @Test
+ public void maybeLogFlushLag_MustLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ @Test
+ public void maybeLogFlushLag_NoLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ /**
+ * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+ */
+ @Test
+ public void maybeLogFlushLag_MultipleOperations()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+
+ int runCount = 12;
+ for (int i = 1; i <= runCount; i++)
+ {
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+ }
+
+ now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053
Branch: refs/heads/trunk
Commit: 77a12053b69ceebd529556d5159f9325703283eb
Parents: b92d90d 214a3ab
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:48:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:50:36 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 88 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2d4ef25,dfdfbfd..2e77d2e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
-3.0.17
+3.11.3
+ * Reduce nodetool GC thread count (CASSANDRA-14475)
+ * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
+ * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
+ * Update metrics to 3.1.5 (CASSANDRA-12924)
+ * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
+ * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
+ * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
+ * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
+ * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
+ * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
+ * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
+ * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
+ * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
+ * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
+ * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
+ * Fix wildcard GROUP BY queries (CASSANDRA-14209)
+Merged from 3.0:
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7c5d300,0845bd5..b7ab705
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,15 -17,6 +17,16 @@@
*/
package org.apache.cassandra.db.commitlog;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
++import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
@@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog
boolean sync()
{
+ // always run once after shutdown signalled
+ boolean shutdownRequested = shutdown;
+
try
{
- // always run once after shutdown signalled
- boolean run = !shutdown;
-
// sync and signal
- long pollStarted = clock.currentTimeMillis();
- boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+ long pollStarted = clock.nanoTime();
- if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
++ boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested;
+ if (flushToDisk)
{
// in this branch, we want to flush the commit log to disk
syncRequested = false;
@@@ -181,47 -180,30 +183,19 @@@
else
{
// in this branch, just update the commit log sync headers
- commitLog.sync(false, false);
+ commitLog.sync(false);
}
- // sleep any time we have left before the next one is due
- long now = clock.currentTimeMillis();
+ long now = clock.nanoTime();
- long wakeUpAt = pollStarted + markerIntervalNanos;
- if (wakeUpAt < now)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy += now - wakeUpAt;
- lagCount++;
- }
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount,
- String.format("%.2f", (now - firstLagAt) * 1e-9d),
- String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
- lagCount,
- String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
- if (logged)
- firstLagAt = 0;
- }
+ if (flushToDisk)
+ maybeLogFlushLag(pollStarted, now);
- if (!run)
+ if (shutdownRequested)
return false;
- // if we have lagged this round, we probably have work to do already so we don't sleep
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- return true;
-
- try
- {
- haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
- haveWork.drainPermits();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
++ long wakeUpAt = pollStarted + markerIntervalNanos;
+ if (wakeUpAt > now)
+ LockSupport.parkNanos(wakeUpAt - now);
}
catch (Throwable t)
{
@@@ -229,13 -211,67 +203,63 @@@
return false;
// sleep for full poll-interval after an error, so we don't spam the log file
- try
- {
- haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ LockSupport.parkNanos(markerIntervalNanos);
}
+
return true;
}
+
+ /**
- * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
++ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}.
+ */
+ @VisibleForTesting
+ boolean maybeLogFlushLag(long pollStarted, long now)
+ {
+ long flushDuration = now - pollStarted;
+ totalSyncDuration += flushDuration;
+
+ // this is the timestamp by which we should have completed the flush
- long maxFlushTimestamp = pollStarted + syncIntervalMillis;
++ long maxFlushTimestamp = pollStarted + syncIntervalNanos;
+ if (maxFlushTimestamp > now)
+ return false;
+
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
+ {
+ firstLagAt = now;
+ syncExceededIntervalBy = lagCount = 0;
+ syncCount = 1;
+ totalSyncDuration = flushDuration;
+ }
+ syncExceededIntervalBy += now - maxFlushTimestamp;
+ lagCount++;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
++ boolean logged = NoSpamLogger.log(logger,
++ NoSpamLogger.Level.WARN,
++ 5,
++ TimeUnit.MINUTES,
++ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
++ syncCount,
++ String.format("%.2f", (now - firstLagAt) * 1e-9d),
++ String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
++ lagCount,
++ String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ long getTotalSyncDuration()
+ {
+ return totalSyncDuration;
+ }
}
-
/**
* Block for @param alloc to be sync'd as necessary, and handle bookkeeping
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 18f15fa,6f51eaf..bc5cb29
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe
markCount.incrementAndGet();
}
}
+
+ @Test
+ public void maybeLogFlushLag_MustLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
- long now = pollStarted + (syncTimeMillis * 2);
++ long now = Integer.MAX_VALUE;
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ @Test
+ public void maybeLogFlushLag_NoLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ /**
+ * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+ */
+ @Test
+ public void maybeLogFlushLag_MultipleOperations()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+
+ int runCount = 12;
+ for (int i = 1; i <= runCount; i++)
+ {
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+ }
+
- now = pollStarted + (syncTimeMillis * 2);
++ now = pollStarted + Integer.MAX_VALUE;
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/843a5fdf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/843a5fdf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/843a5fdf
Branch: refs/heads/trunk
Commit: 843a5fdf2ff8f2cb61a4e1d6632fd443bc2136fb
Parents: 5d87677 77a1205
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:50:58 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:51:50 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 88 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/843a5fdf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eb064be,2e77d2e..9857704
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -264,8 -16,10 +264,9 @@@
* RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
* Fix wildcard GROUP BY queries (CASSANDRA-14209)
Merged from 3.0:
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
- * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
* Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
* Fix progress stats and units in compactionstats (CASSANDRA-12244)
* Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053
Branch: refs/heads/cassandra-3.11
Commit: 77a12053b69ceebd529556d5159f9325703283eb
Parents: b92d90d 214a3ab
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:48:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:50:36 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 88 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2d4ef25,dfdfbfd..2e77d2e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
-3.0.17
+3.11.3
+ * Reduce nodetool GC thread count (CASSANDRA-14475)
+ * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
+ * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
+ * Update metrics to 3.1.5 (CASSANDRA-12924)
+ * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
+ * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
+ * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
+ * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
+ * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
+ * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
+ * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
+ * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
+ * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
+ * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
+ * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
+ * Fix wildcard GROUP BY queries (CASSANDRA-14209)
+Merged from 3.0:
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7c5d300,0845bd5..b7ab705
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,15 -17,6 +17,16 @@@
*/
package org.apache.cassandra.db.commitlog;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
++import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
@@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog
boolean sync()
{
+ // always run once after shutdown signalled
+ boolean shutdownRequested = shutdown;
+
try
{
- // always run once after shutdown signalled
- boolean run = !shutdown;
-
// sync and signal
- long pollStarted = clock.currentTimeMillis();
- boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+ long pollStarted = clock.nanoTime();
- if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
++ boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested;
+ if (flushToDisk)
{
// in this branch, we want to flush the commit log to disk
syncRequested = false;
@@@ -181,47 -180,30 +183,19 @@@
else
{
// in this branch, just update the commit log sync headers
- commitLog.sync(false, false);
+ commitLog.sync(false);
}
- // sleep any time we have left before the next one is due
- long now = clock.currentTimeMillis();
+ long now = clock.nanoTime();
- long wakeUpAt = pollStarted + markerIntervalNanos;
- if (wakeUpAt < now)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy += now - wakeUpAt;
- lagCount++;
- }
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount,
- String.format("%.2f", (now - firstLagAt) * 1e-9d),
- String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
- lagCount,
- String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
- if (logged)
- firstLagAt = 0;
- }
+ if (flushToDisk)
+ maybeLogFlushLag(pollStarted, now);
- if (!run)
+ if (shutdownRequested)
return false;
- // if we have lagged this round, we probably have work to do already so we don't sleep
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- return true;
-
- try
- {
- haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
- haveWork.drainPermits();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
++ long wakeUpAt = pollStarted + markerIntervalNanos;
+ if (wakeUpAt > now)
+ LockSupport.parkNanos(wakeUpAt - now);
}
catch (Throwable t)
{
@@@ -229,13 -211,67 +203,63 @@@
return false;
// sleep for full poll-interval after an error, so we don't spam the log file
- try
- {
- haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ LockSupport.parkNanos(markerIntervalNanos);
}
+
return true;
}
+
+ /**
- * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
++ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}.
+ */
+ @VisibleForTesting
+ boolean maybeLogFlushLag(long pollStarted, long now)
+ {
+ long flushDuration = now - pollStarted;
+ totalSyncDuration += flushDuration;
+
+ // this is the timestamp by which we should have completed the flush
- long maxFlushTimestamp = pollStarted + syncIntervalMillis;
++ long maxFlushTimestamp = pollStarted + syncIntervalNanos;
+ if (maxFlushTimestamp > now)
+ return false;
+
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
+ {
+ firstLagAt = now;
+ syncExceededIntervalBy = lagCount = 0;
+ syncCount = 1;
+ totalSyncDuration = flushDuration;
+ }
+ syncExceededIntervalBy += now - maxFlushTimestamp;
+ lagCount++;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
++ boolean logged = NoSpamLogger.log(logger,
++ NoSpamLogger.Level.WARN,
++ 5,
++ TimeUnit.MINUTES,
++ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
++ syncCount,
++ String.format("%.2f", (now - firstLagAt) * 1e-9d),
++ String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
++ lagCount,
++ String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ long getTotalSyncDuration()
+ {
+ return totalSyncDuration;
+ }
}
-
/**
* Block for @param alloc to be sync'd as necessary, and handle bookkeeping
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 18f15fa,6f51eaf..bc5cb29
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe
markCount.incrementAndGet();
}
}
+
+ @Test
+ public void maybeLogFlushLag_MustLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
- long now = pollStarted + (syncTimeMillis * 2);
++ long now = Integer.MAX_VALUE;
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ @Test
+ public void maybeLogFlushLag_NoLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ /**
+ * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+ */
+ @Test
+ public void maybeLogFlushLag_MultipleOperations()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+
+ int runCount = 12;
+ for (int i = 1; i <= runCount; i++)
+ {
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+ }
+
- now = pollStarted + (syncTimeMillis * 2);
++ now = pollStarted + Integer.MAX_VALUE;
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/6] cassandra git commit: Fix regression of lagging commitlog flush
log message
Posted by ja...@apache.org.
Fix regression of lagging commitlog flush log message
patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf
Branch: refs/heads/cassandra-3.11
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 85 +++++++++++++-------
.../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
* Add Missing dependencies in pom-all (CASSANDRA-14422)
* Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
* Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
public abstract class AbstractCommitLogService
{
/**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
// sync and signal
long pollStarted = clock.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+ if (flushToDisk)
{
// in this branch, we want to flush the commit log to disk
syncRequested = false;
commitLog.sync(shutdown, true);
lastSyncedAt = pollStarted;
syncComplete.signalAll();
+ syncCount++;
}
else
{
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
commitLog.sync(false, false);
}
- // sleep any time we have left before the next one is due
long now = clock.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
- {
- // if we have lagged noticeably, update our lag counter
- if (firstLagAt == 0)
- {
- firstLagAt = now;
- totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
- }
- syncExceededIntervalBy -= sleep;
- lagCount++;
- }
- syncCount++;
- totalSyncDuration += now - pollStarted;
-
- if (firstLagAt > 0)
- {
- //Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
- if (logged)
- firstLagAt = 0;
- }
+ if (flushToDisk)
+ maybeLogFlushLag(pollStarted, now);
if (!run)
return false;
// if we have lagged this round, we probably have work to do already so we don't sleep
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
return true;
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
}
return true;
}
+
+ /**
+ * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+ */
+ @VisibleForTesting
+ boolean maybeLogFlushLag(long pollStarted, long now)
+ {
+ long flushDuration = now - pollStarted;
+ totalSyncDuration += flushDuration;
+
+ // this is the timestamp by which we should have completed the flush
+ long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+ if (maxFlushTimestamp > now)
+ return false;
+
+ // if we have lagged noticeably, update our lag counter
+ if (firstLagAt == 0)
+ {
+ firstLagAt = now;
+ syncExceededIntervalBy = lagCount = 0;
+ syncCount = 1;
+ totalSyncDuration = flushDuration;
+ }
+ syncExceededIntervalBy += now - maxFlushTimestamp;
+ lagCount++;
+
+ if (firstLagAt > 0)
+ {
+ //Only reset the lag tracking if it actually logged this time
+ boolean logged = NoSpamLogger.log(
+ logger,
+ NoSpamLogger.Level.WARN,
+ 5,
+ TimeUnit.MINUTES,
+ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+ syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+ if (logged)
+ firstLagAt = 0;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ long getTotalSyncDuration()
+ {
+ return totalSyncDuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FreeRunningClock;
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
FreeRunningClock clock = new FreeRunningClock();
FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
- AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+ SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
// at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
markCount.incrementAndGet();
}
}
+
+ @Test
+ public void maybeLogFlushLag_MustLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ @Test
+ public void maybeLogFlushLag_NoLog()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
+
+ /**
+ * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+ */
+ @Test
+ public void maybeLogFlushLag_MultipleOperations()
+ {
+ long syncTimeMillis = 10;
+ SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+ long pollStarted = 1;
+ long now = pollStarted + (syncTimeMillis - 1);
+
+ int runCount = 12;
+ for (int i = 1; i <= runCount; i++)
+ {
+ Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+ }
+
+ now = pollStarted + (syncTimeMillis * 2);
+ Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+ Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org