You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2020/06/29 16:55:10 UTC

[cassandra] branch trunk updated: Prune expired messages less frequently in internode messaging

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56f24f7  Prune expired messages less frequently in internode messaging
56f24f7 is described below

commit 56f24f78f62c9945fae40790e3ed09893fa1ed18
Author: Sergio Bossa <se...@gmail.com>
AuthorDate: Thu Apr 2 19:30:26 2020 +0100

    Prune expired messages less frequently in internode messaging
    
    Patch by Sergio Bossa; Reviewed by Aleksey Yeschenko for CASSANDRA-15700
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/net/Message.java     |   5 +
 .../apache/cassandra/net/OutboundConnection.java   |   6 +-
 .../apache/cassandra/net/OutboundMessageQueue.java |  88 +++++++++----
 .../cassandra/net/OutboundMessageQueueTest.java    | 137 ++++++++++++++++++++-
 5 files changed, 207 insertions(+), 30 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0781ca2..1c30b58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Prune expired messages less frequently in internode messaging (CASSANDRA-15700)
  * Fix Ec2Snitch handling of legacy mode for dc names matching both formats, eg "us-west-2" (CASSANDRA-15878)
  * Add support for server side DESCRIBE statements (CASSANDRA-14825)
  * Fail startup if -Xmn is set when the G1 garbage collector is used (CASSANDRA-15839)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 0eb7710..01ba5d4 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -188,6 +188,11 @@ public class Message<T>
         return outWithParam(nextId(), verb, payload, null, null);
     }
 
+    public static <T> Message<T> out(Verb verb, T payload, long expiresAtNanos)
+    {
+        return outWithParam(nextId(), verb, expiresAtNanos, payload, 0, null, null);
+    }
+
     public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag flag)
     {
         assert !verb.isResponse();
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index d7ebcd8..635f221 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -295,7 +295,7 @@ public class OutboundConnection
         this.reserveCapacityInBytes = reserveCapacityInBytes;
         this.callbacks = template.callbacks;
         this.debug = template.debug;
-        this.queue = new OutboundMessageQueue(this::onExpired);
+        this.queue = new OutboundMessageQueue(approxTime, this::onExpired);
         this.delivery = type == ConnectionType.LARGE_MESSAGES
                         ? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor)
                         : new EventLoopDelivery();
@@ -571,8 +571,8 @@ public class OutboundConnection
          */
         void executeAgain()
         {
-             // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one.
-             // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately
+            // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one.
+            // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately
             if (!isExecuting(getAndUpdate(i -> !isExecuting(i) ? EXECUTING : EXECUTING_AGAIN)))
                 executor.execute(this);
         }
diff --git a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
index 48c7666..3d8bac0 100644
--- a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
+++ b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
@@ -21,18 +21,20 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.utils.MonotonicClock;
+
 import static java.lang.Math.min;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
 
 /**
  * A composite queue holding messages to be delivered by an {@link OutboundConnection}.
@@ -59,17 +61,22 @@ class OutboundMessageQueue
         boolean accept(Message<?> message) throws Produces;
     }
 
+    private final MonotonicClock clock;
     private final MessageConsumer<RuntimeException> onExpired;
 
     private final ManyToOneConcurrentLinkedQueue<Message<?>> externalQueue = new ManyToOneConcurrentLinkedQueue<>();
     private final PrunableArrayQueue<Message<?>> internalQueue = new PrunableArrayQueue<>(256);
 
     private volatile long earliestExpiresAt = Long.MAX_VALUE;
+    private volatile long nextExpirationDeadline = Long.MAX_VALUE;
     private static final AtomicLongFieldUpdater<OutboundMessageQueue> earliestExpiresAtUpdater =
         AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "earliestExpiresAt");
+    private static final AtomicLongFieldUpdater<OutboundMessageQueue> nextExpirationDeadlineUpdater =
+        AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "nextExpirationDeadline");
 
-    OutboundMessageQueue(MessageConsumer<RuntimeException> onExpired)
+    OutboundMessageQueue(MonotonicClock clock, MessageConsumer<RuntimeException> onExpired)
     {
+        this.clock = clock;
         this.onExpired = onExpired;
     }
 
@@ -80,7 +87,9 @@ class OutboundMessageQueue
     {
         maybePruneExpired();
         externalQueue.offer(m);
-        maybeUpdateMinimumExpiryTime(m.expiresAtNanos());
+        nextExpirationDeadlineUpdater.accumulateAndGet(this,
+                                                       maybeUpdateEarliestExpiresAt(clock.now(), m.expiresAtNanos()),
+                                                       Math::min);
     }
 
     /**
@@ -105,7 +114,7 @@ class OutboundMessageQueue
      */
     void runEventually(Consumer<WithLock> runEventually)
     {
-        try (WithLock withLock = lockOrCallback(approxTime.now(), () -> runEventually(runEventually)))
+        try (WithLock withLock = lockOrCallback(clock.now(), () -> runEventually(runEventually)))
         {
             if (withLock != null)
                 runEventually.accept(withLock);
@@ -136,7 +145,6 @@ class OutboundMessageQueue
         private WithLock(long nowNanos)
         {
             this.nowNanos = nowNanos;
-            earliestExpiresAt = Long.MAX_VALUE;
             externalQueue.drain(internalQueue::offer);
         }
 
@@ -145,7 +153,7 @@ class OutboundMessageQueue
             Message<?> m;
             while (null != (m = internalQueue.poll()))
             {
-                if (shouldSend(m, nowNanos))
+                if (shouldSend(m, clock, nowNanos))
                     break;
 
                 onExpired.accept(m);
@@ -165,7 +173,7 @@ class OutboundMessageQueue
             Message<?> m;
             while (null != (m = internalQueue.peek()))
             {
-                if (shouldSend(m, nowNanos))
+                if (shouldSend(m, clock, nowNanos))
                     break;
 
                 internalQueue.poll();
@@ -185,7 +193,9 @@ class OutboundMessageQueue
         @Override
         public void close()
         {
-            pruneInternalQueueWithLock(nowNanos);
+            if (clock.isAfter(nowNanos, nextExpirationDeadline))
+                pruneInternalQueueWithLock(nowNanos);
+
             unlock();
         }
     }
@@ -195,20 +205,47 @@ class OutboundMessageQueue
      */
     boolean maybePruneExpired()
     {
-        return maybePruneExpired(approxTime.now());
+        return maybePruneExpired(clock.now());
     }
 
     private boolean maybePruneExpired(long nowNanos)
     {
-        if (approxTime.isAfter(nowNanos, earliestExpiresAt))
+        if (clock.isAfter(nowNanos, nextExpirationDeadline))
             return tryRun(() -> pruneWithLock(nowNanos));
+
         return false;
     }
 
-    private void maybeUpdateMinimumExpiryTime(long newTime)
+    /**
+     * Update {@code earliestExpiresAt} with the given {@code candidateTime} if less than the current value OR
+     * if the current value is past the current {@code nowNanos} time: this last condition is needed to make sure we keep
+     * tracking the earliest expiry time even while we prune previous values, so that at the end of the pruning task,
+     * we can reconcile between the earliest expiry time recorded at pruning and the one recorded at insert time.
+     */
+    private long maybeUpdateEarliestExpiresAt(long nowNanos, long candidateTime)
     {
-        if (newTime < earliestExpiresAt)
-            earliestExpiresAtUpdater.accumulateAndGet(this, newTime, Math::min);
+        return earliestExpiresAtUpdater.accumulateAndGet(this, candidateTime, (oldTime, newTime) -> {
+            if (clock.isAfter(nowNanos, oldTime))
+                return newTime;
+            else
+                return min(oldTime, newTime);
+        });
+    }
+
+    /**
+     * Update {@code nextExpirationDeadline} with the given {@code candidateDeadline} if less than the current
+     * deadline, unless the current deadline is passed in relation to {@code nowNanos}: this is needed
+     * to resolve a race where both {@link #add(org.apache.cassandra.net.Message) } and {@link #pruneInternalQueueWithLock(long) }
+     * try to update the expiration deadline.
+     */
+    private long maybeUpdateNextExpirationDeadline(long nowNanos, long candidateDeadline)
+    {
+        return nextExpirationDeadlineUpdater.accumulateAndGet(this, candidateDeadline, (oldDeadline, newDeadline) -> {
+            if (clock.isAfter(nowNanos, oldDeadline))
+                return newDeadline;
+            else
+                return min(oldDeadline, newDeadline);
+        });
     }
 
     /*
@@ -216,7 +253,6 @@ class OutboundMessageQueue
      */
     private void pruneWithLock(long nowNanos)
     {
-        earliestExpiresAt = Long.MAX_VALUE;
         externalQueue.drain(internalQueue::offer);
         pruneInternalQueueWithLock(nowNanos);
     }
@@ -232,7 +268,7 @@ class OutboundMessageQueue
 
             public boolean shouldPrune(Message<?> message)
             {
-                return !shouldSend(message, nowNanos);
+                return !shouldSend(message, clock, nowNanos);
             }
 
             public void onPruned(Message<?> message)
@@ -249,7 +285,13 @@ class OutboundMessageQueue
         Pruner pruner = new Pruner();
         internalQueue.prune(pruner);
 
-        maybeUpdateMinimumExpiryTime(pruner.earliestExpiresAt);
+        maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, pruner.earliestExpiresAt));
+    }
+
+    @VisibleForTesting
+    long nextExpirationIn(long nowNanos, TimeUnit unit)
+    {
+        return unit.convert(nextExpirationDeadline - nowNanos, TimeUnit.NANOSECONDS);
     }
 
     private static class Locked implements Runnable
@@ -439,10 +481,12 @@ class OutboundMessageQueue
             }
 
             Remover remover = new Remover();
-            earliestExpiresAt = Long.MAX_VALUE;
             externalQueue.drain(internalQueue::offer);
             internalQueue.prune(remover);
-            maybeUpdateMinimumExpiryTime(remover.earliestExpiresAt);
+
+            long nowNanos = clock.now();
+            maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, remover.earliestExpiresAt));
+
             done.countDown();
         }
     }
@@ -456,7 +500,7 @@ class OutboundMessageQueue
     {
         if (remove == null)
             throw new NullPointerException();
-        
+
         RemoveRunner runner;
         while (true)
         {
@@ -477,8 +521,8 @@ class OutboundMessageQueue
         return runner.removed.contains(remove);
     }
 
-    private static boolean shouldSend(Message<?> m, long nowNanos)
+    private static boolean shouldSend(Message<?> m, MonotonicClock clock, long nowNanos)
     {
-        return !approxTime.isAfter(nowNanos, m.expiresAtNanos());
+        return !clock.isAfter(nowNanos, m.expiresAtNanos());
     }
 }
diff --git a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
index db571ac..860e4f1 100644
--- a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.cassandra.net;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 
@@ -27,16 +30,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.OutboundMessageQueue;
-import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FreeRunningClock;
 
 import static org.apache.cassandra.net.NoPayload.noPayload;
+import static org.apache.cassandra.utils.MonotonicClock.approxTime;
 
 // TODO: incomplete
 public class OutboundMessageQueueTest
 {
-
     @BeforeClass
     public static void init()
     {
@@ -50,7 +51,7 @@ public class OutboundMessageQueueTest
         final Message<?> m2 = Message.out(Verb._TEST_1, noPayload);
         final Message<?> m3 = Message.out(Verb._TEST_1, noPayload);
 
-        final OutboundMessageQueue queue = new OutboundMessageQueue(message -> true);
+        final OutboundMessageQueue queue = new OutboundMessageQueue(approxTime, message -> true);
         queue.add(m1);
         queue.add(m2);
         queue.add(m3);
@@ -91,4 +92,130 @@ public class OutboundMessageQueueTest
         }
     }
 
+    @Test
+    public void testExpirationOnIteration()
+    {
+        FreeRunningClock clock = new FreeRunningClock();
+
+        List<Message> expiredMessages = new LinkedList<>();
+        long startTime = clock.now();
+
+        Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
+        Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
+        Message<?> m3;
+        Message<?> m4;
+
+        OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
+        queue.add(m1);
+        queue.add(m2);
+
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            // Do nothing
+        }
+        // Check next expiry time is equal to m2, and we haven't expired anything yet:
+        Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+        Assert.assertTrue(expiredMessages.isEmpty());
+
+        // Wait for m2 expiry time:
+        clock.advance(4, TimeUnit.SECONDS);
+
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            // Add a new message while we're iterating the queue: this will expire later than any existing message.
+            m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(60));
+            queue.add(m3);
+        }
+        // After expiration runs following the WithLock#close(), check the expiration time is updated to m1 (not m3):
+        Assert.assertEquals(7, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+        // Also, m2 was expired and collected:
+        Assert.assertEquals(m2, expiredMessages.remove(0));
+
+        // Wait for m1 expiry time:
+        clock.advance(4, TimeUnit.SECONDS);
+
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            // Add a new message while we're iterating the queue: this will expire sooner than the already existing message.
+            m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
+            queue.add(m4);
+        }
+        // Check m1 was expired and collected:
+        Assert.assertEquals(m1, expiredMessages.remove(0));
+        // Check next expiry time is m4 (not m3):
+        Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+        // Consume all messages before expiration:
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            Assert.assertEquals(m3, l.poll());
+            Assert.assertEquals(m4, l.poll());
+        }
+        // Check next expiry time is still m4 as the deadline hasn't passed yet:
+        Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+        // Go past the deadline:
+        clock.advance(4, TimeUnit.SECONDS);
+
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            // Do nothing, just trigger expiration on close
+        }
+        // Check nothing is expired:
+        Assert.assertTrue(expiredMessages.isEmpty());
+        // Check next expiry time is now Long.MAX_VALUE as nothing was in the queue:
+        Assert.assertEquals(Long.MAX_VALUE, queue.nextExpirationIn(0, TimeUnit.NANOSECONDS));
+    }
+
+    @Test
+    public void testExpirationOnAdd()
+    {
+        FreeRunningClock clock = new FreeRunningClock();
+
+        List<Message> expiredMessages = new LinkedList<>();
+        long startTime = clock.now();
+
+        OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
+
+        Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
+        Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
+        queue.add(m1);
+        queue.add(m2);
+
+        // Check next expiry time is equal to m2, and we haven't expired anything yet:
+        Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+        Assert.assertTrue(expiredMessages.isEmpty());
+
+        // Go past m1 expiry time:
+        clock.advance(8, TimeUnit.SECONDS);
+
+        // Add a new message and verify both m1 and m2 have been expired:
+        Message<?> m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
+        queue.add(m3);
+        Assert.assertEquals(m2, expiredMessages.remove(0));
+        Assert.assertEquals(m1, expiredMessages.remove(0));
+
+        // New expiration deadline is m3:
+        Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+        // Go past m3 expiry time:
+        clock.advance(4, TimeUnit.SECONDS);
+
+        try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+        {
+            // Add a new message and verify nothing is expired because the lock is held by this iteration:
+            Message<?> m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(15));
+            queue.add(m4);
+            Assert.assertTrue(expiredMessages.isEmpty());
+
+            // Also the deadline didn't change, even though we're past the m3 expiry time: this way we're sure the
+            // pruner will run promptly even if falling behind during iteration.
+            Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+        }
+
+        // Check post iteration m3 has expired:
+        Assert.assertEquals(m3, expiredMessages.remove(0));
+        // And deadline is now m4:
+        Assert.assertEquals(15, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org