You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/06/05 20:55:47 UTC

[1/6] cassandra git commit: Fix regression of lagging commitlog flush log message

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 38096da25 -> 214a3abfc
  refs/heads/cassandra-3.11 b92d90dc1 -> 77a12053b
  refs/heads/trunk 5d8767765 -> 843a5fdf2


Fix regression of lagging commitlog flush log message

patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf

Branch: refs/heads/cassandra-3.0
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 85 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
  * Add Missing dependencies in pom-all (CASSANDRA-14422)
  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractCommitLogService
 {
     /**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
 
                 // sync and signal
                 long pollStarted = clock.currentTimeMillis();
-                if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+                if (flushToDisk)
                 {
                     // in this branch, we want to flush the commit log to disk
                     syncRequested = false;
                     commitLog.sync(shutdown, true);
                     lastSyncedAt = pollStarted;
                     syncComplete.signalAll();
+                    syncCount++;
                 }
                 else
                 {
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
                     commitLog.sync(false, false);
                 }
 
-                // sleep any time we have left before the next one is due
                 long now = clock.currentTimeMillis();
-                long sleep = pollStarted + markerIntervalMillis - now;
-                if (sleep < 0)
-                {
-                    // if we have lagged noticeably, update our lag counter
-                    if (firstLagAt == 0)
-                    {
-                        firstLagAt = now;
-                        totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                    }
-                    syncExceededIntervalBy -= sleep;
-                    lagCount++;
-                }
-                syncCount++;
-                totalSyncDuration += now - pollStarted;
-
-                if (firstLagAt > 0)
-                {
-                    //Only reset the lag tracking if it actually logged this time
-                    boolean logged = NoSpamLogger.log(
-                    logger,
-                    NoSpamLogger.Level.WARN,
-                    5,
-                    TimeUnit.MINUTES,
-                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                    syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
-                    if (logged)
-                        firstLagAt = 0;
-                }
+                if (flushToDisk)
+                    maybeLogFlushLag(pollStarted, now);
 
                 if (!run)
                     return false;
 
                 // if we have lagged this round, we probably have work to do already so we don't sleep
+                long sleep = pollStarted + markerIntervalMillis - now;
                 if (sleep < 0)
                     return true;
 
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
             }
             return true;
         }
+
+        /**
+         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+         */
+        @VisibleForTesting
+        boolean maybeLogFlushLag(long pollStarted, long now)
+        {
+            long flushDuration = now - pollStarted;
+            totalSyncDuration += flushDuration;
+
+            // this is the timestamp by which we should have completed the flush
+            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+            if (maxFlushTimestamp > now)
+                return false;
+
+            // if we have lagged noticeably, update our lag counter
+            if (firstLagAt == 0)
+            {
+                firstLagAt = now;
+                syncExceededIntervalBy = lagCount = 0;
+                syncCount = 1;
+                totalSyncDuration = flushDuration;
+            }
+            syncExceededIntervalBy += now - maxFlushTimestamp;
+            lagCount++;
+
+            if (firstLagAt > 0)
+            {
+                //Only reset the lag tracking if it actually logged this time
+                boolean logged = NoSpamLogger.log(
+                logger,
+                NoSpamLogger.Level.WARN,
+                5,
+                TimeUnit.MINUTES,
+                "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+                if (logged)
+                    firstLagAt = 0;
+            }
+            return true;
+        }
+
+        @VisibleForTesting
+        long getTotalSyncDuration()
+        {
+            return totalSyncDuration;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FreeRunningClock;
 
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
         long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
         FreeRunningClock clock = new FreeRunningClock();
         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
-        AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+        SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
         FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
 
         // at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
                 markCount.incrementAndGet();
         }
     }
+
+    @Test
+    public void maybeLogFlushLag_MustLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    @Test
+    public void maybeLogFlushLag_NoLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+        Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    /**
+     * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+     */
+    @Test
+    public void maybeLogFlushLag_MultipleOperations()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+
+        int runCount = 12;
+        for (int i = 1; i <= runCount; i++)
+        {
+            Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+            Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+        }
+
+        now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Fix regression of lagging commitlog flush log message

Posted by ja...@apache.org.
Fix regression of lagging commitlog flush log message

patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf

Branch: refs/heads/trunk
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 85 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
  * Add Missing dependencies in pom-all (CASSANDRA-14422)
  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractCommitLogService
 {
     /**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
 
                 // sync and signal
                 long pollStarted = clock.currentTimeMillis();
-                if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+                if (flushToDisk)
                 {
                     // in this branch, we want to flush the commit log to disk
                     syncRequested = false;
                     commitLog.sync(shutdown, true);
                     lastSyncedAt = pollStarted;
                     syncComplete.signalAll();
+                    syncCount++;
                 }
                 else
                 {
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
                     commitLog.sync(false, false);
                 }
 
-                // sleep any time we have left before the next one is due
                 long now = clock.currentTimeMillis();
-                long sleep = pollStarted + markerIntervalMillis - now;
-                if (sleep < 0)
-                {
-                    // if we have lagged noticeably, update our lag counter
-                    if (firstLagAt == 0)
-                    {
-                        firstLagAt = now;
-                        totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                    }
-                    syncExceededIntervalBy -= sleep;
-                    lagCount++;
-                }
-                syncCount++;
-                totalSyncDuration += now - pollStarted;
-
-                if (firstLagAt > 0)
-                {
-                    //Only reset the lag tracking if it actually logged this time
-                    boolean logged = NoSpamLogger.log(
-                    logger,
-                    NoSpamLogger.Level.WARN,
-                    5,
-                    TimeUnit.MINUTES,
-                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                    syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
-                    if (logged)
-                        firstLagAt = 0;
-                }
+                if (flushToDisk)
+                    maybeLogFlushLag(pollStarted, now);
 
                 if (!run)
                     return false;
 
                 // if we have lagged this round, we probably have work to do already so we don't sleep
+                long sleep = pollStarted + markerIntervalMillis - now;
                 if (sleep < 0)
                     return true;
 
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
             }
             return true;
         }
+
+        /**
+         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+         */
+        @VisibleForTesting
+        boolean maybeLogFlushLag(long pollStarted, long now)
+        {
+            long flushDuration = now - pollStarted;
+            totalSyncDuration += flushDuration;
+
+            // this is the timestamp by which we should have completed the flush
+            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+            if (maxFlushTimestamp > now)
+                return false;
+
+            // if we have lagged noticeably, update our lag counter
+            if (firstLagAt == 0)
+            {
+                firstLagAt = now;
+                syncExceededIntervalBy = lagCount = 0;
+                syncCount = 1;
+                totalSyncDuration = flushDuration;
+            }
+            syncExceededIntervalBy += now - maxFlushTimestamp;
+            lagCount++;
+
+            if (firstLagAt > 0)
+            {
+                //Only reset the lag tracking if it actually logged this time
+                boolean logged = NoSpamLogger.log(
+                logger,
+                NoSpamLogger.Level.WARN,
+                5,
+                TimeUnit.MINUTES,
+                "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+                if (logged)
+                    firstLagAt = 0;
+            }
+            return true;
+        }
+
+        @VisibleForTesting
+        long getTotalSyncDuration()
+        {
+            return totalSyncDuration;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FreeRunningClock;
 
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
         long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
         FreeRunningClock clock = new FreeRunningClock();
         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
-        AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+        SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
         FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
 
         // at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
                 markCount.incrementAndGet();
         }
     }
+
+    @Test
+    public void maybeLogFlushLag_MustLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    @Test
+    public void maybeLogFlushLag_NoLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+        Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    /**
+     * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+     */
+    @Test
+    public void maybeLogFlushLag_MultipleOperations()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+
+        int runCount = 12;
+        for (int i = 1; i <= runCount; i++)
+        {
+            Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+            Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+        }
+
+        now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053

Branch: refs/heads/trunk
Commit: 77a12053b69ceebd529556d5159f9325703283eb
Parents: b92d90d 214a3ab
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:48:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:50:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 88 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2d4ef25,dfdfbfd..2e77d2e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
 -3.0.17
 +3.11.3
 + * Reduce nodetool GC thread count (CASSANDRA-14475)
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
   * Add Missing dependencies in pom-all (CASSANDRA-14422)
   * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
   * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7c5d300,0845bd5..b7ab705
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,15 -17,6 +17,16 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
++import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
@@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog
  
          boolean sync()
          {
 +            // always run once after shutdown signalled
 +            boolean shutdownRequested = shutdown;
 +
              try
              {
 -                // always run once after shutdown signalled
 -                boolean run = !shutdown;
 -
                  // sync and signal
 -                long pollStarted = clock.currentTimeMillis();
 -                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
 +                long pollStarted = clock.nanoTime();
-                 if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
++                boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested;
+                 if (flushToDisk)
                  {
                      // in this branch, we want to flush the commit log to disk
                      syncRequested = false;
@@@ -181,47 -180,30 +183,19 @@@
                  else
                  {
                      // in this branch, just update the commit log sync headers
 -                    commitLog.sync(false, false);
 +                    commitLog.sync(false);
                  }
  
-                 // sleep any time we have left before the next one is due
 -                long now = clock.currentTimeMillis();
 +                long now = clock.nanoTime();
-                 long wakeUpAt = pollStarted + markerIntervalNanos;
-                 if (wakeUpAt < now)
-                 {
-                     // if we have lagged noticeably, update our lag counter
-                     if (firstLagAt == 0)
-                     {
-                         firstLagAt = now;
-                         totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                     }
-                     syncExceededIntervalBy += now - wakeUpAt;
-                     lagCount++;
-                 }
-                 totalSyncDuration += now - pollStarted;
- 
-                 if (firstLagAt > 0)
-                 {
-                     //Only reset the lag tracking if it actually logged this time
-                     boolean logged = NoSpamLogger.log(logger,
-                                                       NoSpamLogger.Level.WARN,
-                                                       5,
-                                                       TimeUnit.MINUTES,
-                                                       "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                                                       syncCount,
-                                                       String.format("%.2f", (now - firstLagAt) * 1e-9d),
-                                                       String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
-                                                       lagCount,
-                                                       String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
-                     if (logged)
-                         firstLagAt = 0;
-                 }
+                 if (flushToDisk)
+                     maybeLogFlushLag(pollStarted, now);
  
 -                if (!run)
 +                if (shutdownRequested)
                      return false;
  
 -                // if we have lagged this round, we probably have work to do already so we don't sleep
 -                long sleep = pollStarted + markerIntervalMillis - now;
 -                if (sleep < 0)
 -                    return true;
 -
 -                try
 -                {
 -                    haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
 -                    haveWork.drainPermits();
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
++                long wakeUpAt = pollStarted + markerIntervalNanos;
 +                if (wakeUpAt > now)
 +                    LockSupport.parkNanos(wakeUpAt - now);
              }
              catch (Throwable t)
              {
@@@ -229,13 -211,67 +203,63 @@@
                      return false;
  
                  // sleep for full poll-interval after an error, so we don't spam the log file
 -                try
 -                {
 -                    haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
 +                LockSupport.parkNanos(markerIntervalNanos);
              }
 +
              return true;
          }
+ 
+         /**
 -         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
++         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}.
+          */
+         @VisibleForTesting
+         boolean maybeLogFlushLag(long pollStarted, long now)
+         {
+             long flushDuration = now - pollStarted;
+             totalSyncDuration += flushDuration;
+ 
+             // this is the timestamp by which we should have completed the flush
 -            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
++            long maxFlushTimestamp = pollStarted + syncIntervalNanos;
+             if (maxFlushTimestamp > now)
+                 return false;
+ 
+             // if we have lagged noticeably, update our lag counter
+             if (firstLagAt == 0)
+             {
+                 firstLagAt = now;
+                 syncExceededIntervalBy = lagCount = 0;
+                 syncCount = 1;
+                 totalSyncDuration = flushDuration;
+             }
+             syncExceededIntervalBy += now - maxFlushTimestamp;
+             lagCount++;
+ 
+             if (firstLagAt > 0)
+             {
+                 //Only reset the lag tracking if it actually logged this time
 -                boolean logged = NoSpamLogger.log(
 -                logger,
 -                NoSpamLogger.Level.WARN,
 -                5,
 -                TimeUnit.MINUTES,
 -                "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
 -                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
++                boolean logged = NoSpamLogger.log(logger,
++                                                  NoSpamLogger.Level.WARN,
++                                                  5,
++                                                  TimeUnit.MINUTES,
++                                                  "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
++                                                  syncCount,
++                                                  String.format("%.2f", (now - firstLagAt) * 1e-9d),
++                                                  String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
++                                                  lagCount,
++                                                  String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
+                 if (logged)
+                     firstLagAt = 0;
+             }
+             return true;
+         }
+ 
+         @VisibleForTesting
+         long getTotalSyncDuration()
+         {
+             return totalSyncDuration;
+         }
      }
  
 -
      /**
       * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 18f15fa,6f51eaf..bc5cb29
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe
                  markCount.incrementAndGet();
          }
      }
+ 
+     @Test
+     public void maybeLogFlushLag_MustLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
 -        long now = pollStarted + (syncTimeMillis * 2);
++        long now = Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
+ 
+     @Test
+     public void maybeLogFlushLag_NoLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+         Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
+ 
+     /**
+      * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+      */
+     @Test
+     public void maybeLogFlushLag_MultipleOperations()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ 
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+ 
+         int runCount = 12;
+         for (int i = 1; i <= runCount; i++)
+         {
+             Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+             Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+         }
+ 
 -        now = pollStarted + (syncTimeMillis * 2);
++        now = pollStarted + Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/843a5fdf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/843a5fdf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/843a5fdf

Branch: refs/heads/trunk
Commit: 843a5fdf2ff8f2cb61a4e1d6632fd443bc2136fb
Parents: 5d87677 77a1205
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:50:58 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:51:50 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 88 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/843a5fdf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eb064be,2e77d2e..9857704
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -264,8 -16,10 +264,9 @@@
   * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
   * Fix wildcard GROUP BY queries (CASSANDRA-14209)
  Merged from 3.0:
+  * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
   * Add Missing dependencies in pom-all (CASSANDRA-14422)
   * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
 - * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
   * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
   * Fix progress stats and units in compactionstats (CASSANDRA-12244)
   * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77a12053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77a12053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77a12053

Branch: refs/heads/cassandra-3.11
Commit: 77a12053b69ceebd529556d5159f9325703283eb
Parents: b92d90d 214a3ab
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Jun 5 13:48:56 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:50:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 88 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2d4ef25,dfdfbfd..2e77d2e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
 -3.0.17
 +3.11.3
 + * Reduce nodetool GC thread count (CASSANDRA-14475)
 + * Fix New SASI view creation during Index Redistribution (CASSANDRA-14055)
 + * Remove string formatting lines from BufferPool hot path (CASSANDRA-14416)
 + * Update metrics to 3.1.5 (CASSANDRA-12924)
 + * Detect OpenJDK jvm type and architecture (CASSANDRA-12793)
 + * Don't use guava collections in the non-system keyspace jmx attributes (CASSANDRA-12271)
 + * Allow existing nodes to use all peers in shadow round (CASSANDRA-13851)
 + * Fix cqlsh to read connection.ssl cqlshrc option again (CASSANDRA-14299)
 + * Downgrade log level to trace for CommitLogSegmentManager (CASSANDRA-14370)
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Serialize empty buffer as empty string for json output format (CASSANDRA-14245)
 + * Allow logging implementation to be interchanged for embedded testing (CASSANDRA-13396)
 + * SASI tokenizer for simple delimiter based entries (CASSANDRA-14247)
 + * Fix Loss of digits when doing CAST from varint/bigint to decimal (CASSANDRA-14170)
 + * RateBasedBackPressure unnecessarily invokes a lock on the Guava RateLimiter (CASSANDRA-14163)
 + * Fix wildcard GROUP BY queries (CASSANDRA-14209)
 +Merged from 3.0:
+  * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
   * Add Missing dependencies in pom-all (CASSANDRA-14422)
   * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
   * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7c5d300,0845bd5..b7ab705
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,15 -17,6 +17,16 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
++import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
@@@ -162,14 -160,15 +163,15 @@@ public abstract class AbstractCommitLog
  
          boolean sync()
          {
 +            // always run once after shutdown signalled
 +            boolean shutdownRequested = shutdown;
 +
              try
              {
 -                // always run once after shutdown signalled
 -                boolean run = !shutdown;
 -
                  // sync and signal
 -                long pollStarted = clock.currentTimeMillis();
 -                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
 +                long pollStarted = clock.nanoTime();
-                 if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
++                boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested;
+                 if (flushToDisk)
                  {
                      // in this branch, we want to flush the commit log to disk
                      syncRequested = false;
@@@ -181,47 -180,30 +183,19 @@@
                  else
                  {
                      // in this branch, just update the commit log sync headers
 -                    commitLog.sync(false, false);
 +                    commitLog.sync(false);
                  }
  
-                 // sleep any time we have left before the next one is due
 -                long now = clock.currentTimeMillis();
 +                long now = clock.nanoTime();
-                 long wakeUpAt = pollStarted + markerIntervalNanos;
-                 if (wakeUpAt < now)
-                 {
-                     // if we have lagged noticeably, update our lag counter
-                     if (firstLagAt == 0)
-                     {
-                         firstLagAt = now;
-                         totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                     }
-                     syncExceededIntervalBy += now - wakeUpAt;
-                     lagCount++;
-                 }
-                 totalSyncDuration += now - pollStarted;
- 
-                 if (firstLagAt > 0)
-                 {
-                     //Only reset the lag tracking if it actually logged this time
-                     boolean logged = NoSpamLogger.log(logger,
-                                                       NoSpamLogger.Level.WARN,
-                                                       5,
-                                                       TimeUnit.MINUTES,
-                                                       "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                                                       syncCount,
-                                                       String.format("%.2f", (now - firstLagAt) * 1e-9d),
-                                                       String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
-                                                       lagCount,
-                                                       String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
-                     if (logged)
-                         firstLagAt = 0;
-                 }
+                 if (flushToDisk)
+                     maybeLogFlushLag(pollStarted, now);
  
 -                if (!run)
 +                if (shutdownRequested)
                      return false;
  
 -                // if we have lagged this round, we probably have work to do already so we don't sleep
 -                long sleep = pollStarted + markerIntervalMillis - now;
 -                if (sleep < 0)
 -                    return true;
 -
 -                try
 -                {
 -                    haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
 -                    haveWork.drainPermits();
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
++                long wakeUpAt = pollStarted + markerIntervalNanos;
 +                if (wakeUpAt > now)
 +                    LockSupport.parkNanos(wakeUpAt - now);
              }
              catch (Throwable t)
              {
@@@ -229,13 -211,67 +203,63 @@@
                      return false;
  
                  // sleep for full poll-interval after an error, so we don't spam the log file
 -                try
 -                {
 -                    haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
 -                }
 -                catch (InterruptedException e)
 -                {
 -                    throw new AssertionError();
 -                }
 +                LockSupport.parkNanos(markerIntervalNanos);
              }
 +
              return true;
          }
+ 
+         /**
 -         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
++         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalNanos}.
+          */
+         @VisibleForTesting
+         boolean maybeLogFlushLag(long pollStarted, long now)
+         {
+             long flushDuration = now - pollStarted;
+             totalSyncDuration += flushDuration;
+ 
+             // this is the timestamp by which we should have completed the flush
 -            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
++            long maxFlushTimestamp = pollStarted + syncIntervalNanos;
+             if (maxFlushTimestamp > now)
+                 return false;
+ 
+             // if we have lagged noticeably, update our lag counter
+             if (firstLagAt == 0)
+             {
+                 firstLagAt = now;
+                 syncExceededIntervalBy = lagCount = 0;
+                 syncCount = 1;
+                 totalSyncDuration = flushDuration;
+             }
+             syncExceededIntervalBy += now - maxFlushTimestamp;
+             lagCount++;
+ 
+             if (firstLagAt > 0)
+             {
+                 //Only reset the lag tracking if it actually logged this time
 -                boolean logged = NoSpamLogger.log(
 -                logger,
 -                NoSpamLogger.Level.WARN,
 -                5,
 -                TimeUnit.MINUTES,
 -                "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
 -                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
++                boolean logged = NoSpamLogger.log(logger,
++                                                  NoSpamLogger.Level.WARN,
++                                                  5,
++                                                  TimeUnit.MINUTES,
++                                                  "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
++                                                  syncCount,
++                                                  String.format("%.2f", (now - firstLagAt) * 1e-9d),
++                                                  String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
++                                                  lagCount,
++                                                  String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
+                 if (logged)
+                     firstLagAt = 0;
+             }
+             return true;
+         }
+ 
+         @VisibleForTesting
+         long getTotalSyncDuration()
+         {
+             return totalSyncDuration;
+         }
      }
  
 -
      /**
       * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77a12053/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 18f15fa,6f51eaf..bc5cb29
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@@ -163,4 -174,50 +164,50 @@@ public class AbstractCommitLogServiceTe
                  markCount.incrementAndGet();
          }
      }
+ 
+     @Test
+     public void maybeLogFlushLag_MustLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
 -        long now = pollStarted + (syncTimeMillis * 2);
++        long now = Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
+ 
+     @Test
+     public void maybeLogFlushLag_NoLog()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+         Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
+ 
+     /**
+      * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+      */
+     @Test
+     public void maybeLogFlushLag_MultipleOperations()
+     {
+         long syncTimeMillis = 10;
+         SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+ 
+         long pollStarted = 1;
+         long now = pollStarted + (syncTimeMillis - 1);
+ 
+         int runCount = 12;
+         for (int i = 1; i <= runCount; i++)
+         {
+             Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+             Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+         }
+ 
 -        now = pollStarted + (syncTimeMillis * 2);
++        now = pollStarted + Integer.MAX_VALUE;
+         Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+         Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Fix regression of lagging commitlog flush log message

Posted by ja...@apache.org.
Fix regression of lagging commitlog flush log message

patch by jasobrown, reviewed by Jordan West for CASSANDRA-14451


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/214a3abf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/214a3abf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/214a3abf

Branch: refs/heads/cassandra-3.11
Commit: 214a3abfcc25460af50805b543a5833697a1b341
Parents: 38096da
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 1 05:45:23 2018 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jun 5 13:47:37 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/AbstractCommitLogService.java  | 85 +++++++++++++-------
 .../commitlog/AbstractCommitLogServiceTest.java | 49 ++++++++++-
 3 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fe6d1..dfdfbfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
  * Add Missing dependencies in pom-all (CASSANDRA-14422)
  * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
  * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 1cee55d..0845bd5 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractCommitLogService
 {
     /**
@@ -165,13 +167,15 @@ public abstract class AbstractCommitLogService
 
                 // sync and signal
                 long pollStarted = clock.currentTimeMillis();
-                if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                boolean flushToDisk = lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested;
+                if (flushToDisk)
                 {
                     // in this branch, we want to flush the commit log to disk
                     syncRequested = false;
                     commitLog.sync(shutdown, true);
                     lastSyncedAt = pollStarted;
                     syncComplete.signalAll();
+                    syncCount++;
                 }
                 else
                 {
@@ -179,41 +183,15 @@ public abstract class AbstractCommitLogService
                     commitLog.sync(false, false);
                 }
 
-                // sleep any time we have left before the next one is due
                 long now = clock.currentTimeMillis();
-                long sleep = pollStarted + markerIntervalMillis - now;
-                if (sleep < 0)
-                {
-                    // if we have lagged noticeably, update our lag counter
-                    if (firstLagAt == 0)
-                    {
-                        firstLagAt = now;
-                        totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
-                    }
-                    syncExceededIntervalBy -= sleep;
-                    lagCount++;
-                }
-                syncCount++;
-                totalSyncDuration += now - pollStarted;
-
-                if (firstLagAt > 0)
-                {
-                    //Only reset the lag tracking if it actually logged this time
-                    boolean logged = NoSpamLogger.log(
-                    logger,
-                    NoSpamLogger.Level.WARN,
-                    5,
-                    TimeUnit.MINUTES,
-                    "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
-                    syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
-                    if (logged)
-                        firstLagAt = 0;
-                }
+                if (flushToDisk)
+                    maybeLogFlushLag(pollStarted, now);
 
                 if (!run)
                     return false;
 
                 // if we have lagged this round, we probably have work to do already so we don't sleep
+                long sleep = pollStarted + markerIntervalMillis - now;
                 if (sleep < 0)
                     return true;
 
@@ -244,6 +222,53 @@ public abstract class AbstractCommitLogService
             }
             return true;
         }
+
+        /**
+         * Add a log entry whenever the time to flush the commit log to disk exceeds {@link #syncIntervalMillis}.
+         */
+        @VisibleForTesting
+        boolean maybeLogFlushLag(long pollStarted, long now)
+        {
+            long flushDuration = now - pollStarted;
+            totalSyncDuration += flushDuration;
+
+            // this is the timestamp by which we should have completed the flush
+            long maxFlushTimestamp = pollStarted + syncIntervalMillis;
+            if (maxFlushTimestamp > now)
+                return false;
+
+            // if we have lagged noticeably, update our lag counter
+            if (firstLagAt == 0)
+            {
+                firstLagAt = now;
+                syncExceededIntervalBy = lagCount = 0;
+                syncCount = 1;
+                totalSyncDuration = flushDuration;
+            }
+            syncExceededIntervalBy += now - maxFlushTimestamp;
+            lagCount++;
+
+            if (firstLagAt > 0)
+            {
+                //Only reset the lag tracking if it actually logged this time
+                boolean logged = NoSpamLogger.log(
+                logger,
+                NoSpamLogger.Level.WARN,
+                5,
+                TimeUnit.MINUTES,
+                "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+                syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+                if (logged)
+                    firstLagAt = 0;
+            }
+            return true;
+        }
+
+        @VisibleForTesting
+        long getTotalSyncDuration()
+        {
+            return totalSyncDuration;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/214a3abf/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
index 5a46e5f..6f51eaf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/AbstractCommitLogServiceTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogService.SyncRunnable;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FreeRunningClock;
 
@@ -117,7 +118,7 @@ public class AbstractCommitLogServiceTest
         long syncTimeMillis = AbstractCommitLogService.DEFAULT_MARKER_INTERVAL_MILLIS * 2;
         FreeRunningClock clock = new FreeRunningClock();
         FakeCommitLogService commitLogService = new FakeCommitLogService(syncTimeMillis);
-        AbstractCommitLogService.SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
+        SyncRunnable syncRunnable = commitLogService.new SyncRunnable(clock);
         FakeCommitLog commitLog = (FakeCommitLog) commitLogService.commitLog;
 
         // at time 0
@@ -173,4 +174,50 @@ public class AbstractCommitLogServiceTest
                 markCount.incrementAndGet();
         }
     }
+
+    @Test
+    public void maybeLogFlushLag_MustLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    @Test
+    public void maybeLogFlushLag_NoLog()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+        Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
+
+    /**
+     * Mostly tests that {@link SyncRunnable#totalSyncDuration} is handled correctly
+     */
+    @Test
+    public void maybeLogFlushLag_MultipleOperations()
+    {
+        long syncTimeMillis = 10;
+        SyncRunnable syncRunnable = new FakeCommitLogService(syncTimeMillis).new SyncRunnable(new FreeRunningClock());
+
+        long pollStarted = 1;
+        long now = pollStarted + (syncTimeMillis - 1);
+
+        int runCount = 12;
+        for (int i = 1; i <= runCount; i++)
+        {
+            Assert.assertFalse(syncRunnable.maybeLogFlushLag(pollStarted, now));
+            Assert.assertEquals(i * (now - pollStarted), syncRunnable.getTotalSyncDuration());
+        }
+
+        now = pollStarted + (syncTimeMillis * 2);
+        Assert.assertTrue(syncRunnable.maybeLogFlushLag(pollStarted, now));
+        Assert.assertEquals(now - pollStarted, syncRunnable.getTotalSyncDuration());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org