You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2020/06/29 16:55:10 UTC
[cassandra] branch trunk updated: Prune expired messages less
frequently in internode messaging
This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56f24f7 Prune expired messages less frequently in internode messaging
56f24f7 is described below
commit 56f24f78f62c9945fae40790e3ed09893fa1ed18
Author: Sergio Bossa <se...@gmail.com>
AuthorDate: Thu Apr 2 19:30:26 2020 +0100
Prune expired messages less frequently in internode messaging
Patch by Sergio Bossa; Reviewed by Aleksey Yeschenko for CASSANDRA-15700
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Message.java | 5 +
.../apache/cassandra/net/OutboundConnection.java | 6 +-
.../apache/cassandra/net/OutboundMessageQueue.java | 88 +++++++++----
.../cassandra/net/OutboundMessageQueueTest.java | 137 ++++++++++++++++++++-
5 files changed, 207 insertions(+), 30 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0781ca2..1c30b58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Prune expired messages less frequently in internode messaging (CASSANDRA-15700)
* Fix Ec2Snitch handling of legacy mode for dc names matching both formats, eg "us-west-2" (CASSANDRA-15878)
* Add support for server side DESCRIBE statements (CASSANDRA-14825)
* Fail startup if -Xmn is set when the G1 garbage collector is used (CASSANDRA-15839)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 0eb7710..01ba5d4 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -188,6 +188,11 @@ public class Message<T>
return outWithParam(nextId(), verb, payload, null, null);
}
+ public static <T> Message<T> out(Verb verb, T payload, long expiresAtNanos)
+ {
+ return outWithParam(nextId(), verb, expiresAtNanos, payload, 0, null, null);
+ }
+
public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag flag)
{
assert !verb.isResponse();
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index d7ebcd8..635f221 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -295,7 +295,7 @@ public class OutboundConnection
this.reserveCapacityInBytes = reserveCapacityInBytes;
this.callbacks = template.callbacks;
this.debug = template.debug;
- this.queue = new OutboundMessageQueue(this::onExpired);
+ this.queue = new OutboundMessageQueue(approxTime, this::onExpired);
this.delivery = type == ConnectionType.LARGE_MESSAGES
? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor)
: new EventLoopDelivery();
@@ -571,8 +571,8 @@ public class OutboundConnection
*/
void executeAgain()
{
- // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one.
- // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately
+ // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one.
+ // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately
if (!isExecuting(getAndUpdate(i -> !isExecuting(i) ? EXECUTING : EXECUTING_AGAIN)))
executor.execute(this);
}
diff --git a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
index 48c7666..3d8bac0 100644
--- a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
+++ b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
@@ -21,18 +21,20 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.MonotonicClock;
+
import static java.lang.Math.min;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
/**
* A composite queue holding messages to be delivered by an {@link OutboundConnection}.
@@ -59,17 +61,22 @@ class OutboundMessageQueue
boolean accept(Message<?> message) throws Produces;
}
+ private final MonotonicClock clock;
private final MessageConsumer<RuntimeException> onExpired;
private final ManyToOneConcurrentLinkedQueue<Message<?>> externalQueue = new ManyToOneConcurrentLinkedQueue<>();
private final PrunableArrayQueue<Message<?>> internalQueue = new PrunableArrayQueue<>(256);
private volatile long earliestExpiresAt = Long.MAX_VALUE;
+ private volatile long nextExpirationDeadline = Long.MAX_VALUE;
private static final AtomicLongFieldUpdater<OutboundMessageQueue> earliestExpiresAtUpdater =
AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "earliestExpiresAt");
+ private static final AtomicLongFieldUpdater<OutboundMessageQueue> nextExpirationDeadlineUpdater =
+ AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "nextExpirationDeadline");
- OutboundMessageQueue(MessageConsumer<RuntimeException> onExpired)
+ OutboundMessageQueue(MonotonicClock clock, MessageConsumer<RuntimeException> onExpired)
{
+ this.clock = clock;
this.onExpired = onExpired;
}
@@ -80,7 +87,9 @@ class OutboundMessageQueue
{
maybePruneExpired();
externalQueue.offer(m);
- maybeUpdateMinimumExpiryTime(m.expiresAtNanos());
+ nextExpirationDeadlineUpdater.accumulateAndGet(this,
+ maybeUpdateEarliestExpiresAt(clock.now(), m.expiresAtNanos()),
+ Math::min);
}
/**
@@ -105,7 +114,7 @@ class OutboundMessageQueue
*/
void runEventually(Consumer<WithLock> runEventually)
{
- try (WithLock withLock = lockOrCallback(approxTime.now(), () -> runEventually(runEventually)))
+ try (WithLock withLock = lockOrCallback(clock.now(), () -> runEventually(runEventually)))
{
if (withLock != null)
runEventually.accept(withLock);
@@ -136,7 +145,6 @@ class OutboundMessageQueue
private WithLock(long nowNanos)
{
this.nowNanos = nowNanos;
- earliestExpiresAt = Long.MAX_VALUE;
externalQueue.drain(internalQueue::offer);
}
@@ -145,7 +153,7 @@ class OutboundMessageQueue
Message<?> m;
while (null != (m = internalQueue.poll()))
{
- if (shouldSend(m, nowNanos))
+ if (shouldSend(m, clock, nowNanos))
break;
onExpired.accept(m);
@@ -165,7 +173,7 @@ class OutboundMessageQueue
Message<?> m;
while (null != (m = internalQueue.peek()))
{
- if (shouldSend(m, nowNanos))
+ if (shouldSend(m, clock, nowNanos))
break;
internalQueue.poll();
@@ -185,7 +193,9 @@ class OutboundMessageQueue
@Override
public void close()
{
- pruneInternalQueueWithLock(nowNanos);
+ if (clock.isAfter(nowNanos, nextExpirationDeadline))
+ pruneInternalQueueWithLock(nowNanos);
+
unlock();
}
}
@@ -195,20 +205,47 @@ class OutboundMessageQueue
*/
boolean maybePruneExpired()
{
- return maybePruneExpired(approxTime.now());
+ return maybePruneExpired(clock.now());
}
private boolean maybePruneExpired(long nowNanos)
{
- if (approxTime.isAfter(nowNanos, earliestExpiresAt))
+ if (clock.isAfter(nowNanos, nextExpirationDeadline))
return tryRun(() -> pruneWithLock(nowNanos));
+
return false;
}
- private void maybeUpdateMinimumExpiryTime(long newTime)
+ /**
+ * Update {@code earliestExpiresAt} with the given {@code candidateTime} if less than the current value OR
+ * if the current value is past the current {@code nowNanos} time: this last condition is needed to make sure we keep
+ * tracking the earliest expiry time even while we prune previous values, so that at the end of the pruning task,
+ * we can reconcile between the earliest expiry time recorded at pruning and the one recorded at insert time.
+ */
+ private long maybeUpdateEarliestExpiresAt(long nowNanos, long candidateTime)
{
- if (newTime < earliestExpiresAt)
- earliestExpiresAtUpdater.accumulateAndGet(this, newTime, Math::min);
+ return earliestExpiresAtUpdater.accumulateAndGet(this, candidateTime, (oldTime, newTime) -> {
+ if (clock.isAfter(nowNanos, oldTime))
+ return newTime;
+ else
+ return min(oldTime, newTime);
+ });
+ }
+
+ /**
+ * Update {@code nextExpirationDeadline} with the given {@code candidateDeadline} if less than the current
+ * deadline, unless the current deadline is passed in relation to {@code nowNanos}: this is needed
+ * to resolve a race where both {@link #add(org.apache.cassandra.net.Message) } and {@link #pruneInternalQueueWithLock(long) }
+ * try to update the expiration deadline.
+ */
+ private long maybeUpdateNextExpirationDeadline(long nowNanos, long candidateDeadline)
+ {
+ return nextExpirationDeadlineUpdater.accumulateAndGet(this, candidateDeadline, (oldDeadline, newDeadline) -> {
+ if (clock.isAfter(nowNanos, oldDeadline))
+ return newDeadline;
+ else
+ return min(oldDeadline, newDeadline);
+ });
}
/*
@@ -216,7 +253,6 @@ class OutboundMessageQueue
*/
private void pruneWithLock(long nowNanos)
{
- earliestExpiresAt = Long.MAX_VALUE;
externalQueue.drain(internalQueue::offer);
pruneInternalQueueWithLock(nowNanos);
}
@@ -232,7 +268,7 @@ class OutboundMessageQueue
public boolean shouldPrune(Message<?> message)
{
- return !shouldSend(message, nowNanos);
+ return !shouldSend(message, clock, nowNanos);
}
public void onPruned(Message<?> message)
@@ -249,7 +285,13 @@ class OutboundMessageQueue
Pruner pruner = new Pruner();
internalQueue.prune(pruner);
- maybeUpdateMinimumExpiryTime(pruner.earliestExpiresAt);
+ maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, pruner.earliestExpiresAt));
+ }
+
+ @VisibleForTesting
+ long nextExpirationIn(long nowNanos, TimeUnit unit)
+ {
+ return unit.convert(nextExpirationDeadline - nowNanos, TimeUnit.NANOSECONDS);
}
private static class Locked implements Runnable
@@ -439,10 +481,12 @@ class OutboundMessageQueue
}
Remover remover = new Remover();
- earliestExpiresAt = Long.MAX_VALUE;
externalQueue.drain(internalQueue::offer);
internalQueue.prune(remover);
- maybeUpdateMinimumExpiryTime(remover.earliestExpiresAt);
+
+ long nowNanos = clock.now();
+ maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, remover.earliestExpiresAt));
+
done.countDown();
}
}
@@ -456,7 +500,7 @@ class OutboundMessageQueue
{
if (remove == null)
throw new NullPointerException();
-
+
RemoveRunner runner;
while (true)
{
@@ -477,8 +521,8 @@ class OutboundMessageQueue
return runner.removed.contains(remove);
}
- private static boolean shouldSend(Message<?> m, long nowNanos)
+ private static boolean shouldSend(Message<?> m, MonotonicClock clock, long nowNanos)
{
- return !approxTime.isAfter(nowNanos, m.expiresAtNanos());
+ return !clock.isAfter(nowNanos, m.expiresAtNanos());
}
}
diff --git a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
index db571ac..860e4f1 100644
--- a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java
@@ -18,7 +18,10 @@
package org.apache.cassandra.net;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -27,16 +30,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.OutboundMessageQueue;
-import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FreeRunningClock;
import static org.apache.cassandra.net.NoPayload.noPayload;
+import static org.apache.cassandra.utils.MonotonicClock.approxTime;
// TODO: incomplete
public class OutboundMessageQueueTest
{
-
@BeforeClass
public static void init()
{
@@ -50,7 +51,7 @@ public class OutboundMessageQueueTest
final Message<?> m2 = Message.out(Verb._TEST_1, noPayload);
final Message<?> m3 = Message.out(Verb._TEST_1, noPayload);
- final OutboundMessageQueue queue = new OutboundMessageQueue(message -> true);
+ final OutboundMessageQueue queue = new OutboundMessageQueue(approxTime, message -> true);
queue.add(m1);
queue.add(m2);
queue.add(m3);
@@ -91,4 +92,130 @@ public class OutboundMessageQueueTest
}
}
+ @Test
+ public void testExpirationOnIteration()
+ {
+ FreeRunningClock clock = new FreeRunningClock();
+
+ List<Message> expiredMessages = new LinkedList<>();
+ long startTime = clock.now();
+
+ Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
+ Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
+ Message<?> m3;
+ Message<?> m4;
+
+ OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
+ queue.add(m1);
+ queue.add(m2);
+
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ // Do nothing
+ }
+ // Check next expiry time is equal to m2, and we haven't expired anything yet:
+ Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+ Assert.assertTrue(expiredMessages.isEmpty());
+
+ // Wait for m2 expiry time:
+ clock.advance(4, TimeUnit.SECONDS);
+
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ // Add a new message while we're iterating the queue: this will expire later than any existing message.
+ m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(60));
+ queue.add(m3);
+ }
+ // After expiration runs following the WithLock#close(), check the expiration time is updated to m1 (not m3):
+ Assert.assertEquals(7, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+ // Also, m2 was expired and collected:
+ Assert.assertEquals(m2, expiredMessages.remove(0));
+
+ // Wait for m1 expiry time:
+ clock.advance(4, TimeUnit.SECONDS);
+
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ // Add a new message while we're iterating the queue: this will expire sooner than the already existing message.
+ m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
+ queue.add(m4);
+ }
+ // Check m1 was expired and collected:
+ Assert.assertEquals(m1, expiredMessages.remove(0));
+ // Check next expiry time is m4 (not m3):
+ Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+ // Consume all messages before expiration:
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ Assert.assertEquals(m3, l.poll());
+ Assert.assertEquals(m4, l.poll());
+ }
+ // Check next expiry time is still m4 as the deadline hasn't passed yet:
+ Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+ // Go past the deadline:
+ clock.advance(4, TimeUnit.SECONDS);
+
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ // Do nothing, just trigger expiration on close
+ }
+ // Check nothing is expired:
+ Assert.assertTrue(expiredMessages.isEmpty());
+ // Check next expiry time is now Long.MAX_VALUE as nothing was in the queue:
+ Assert.assertEquals(Long.MAX_VALUE, queue.nextExpirationIn(0, TimeUnit.NANOSECONDS));
+ }
+
+ @Test
+ public void testExpirationOnAdd()
+ {
+ FreeRunningClock clock = new FreeRunningClock();
+
+ List<Message> expiredMessages = new LinkedList<>();
+ long startTime = clock.now();
+
+ OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m));
+
+ Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7));
+ Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3));
+ queue.add(m1);
+ queue.add(m2);
+
+ // Check next expiry time is equal to m2, and we haven't expired anything yet:
+ Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+ Assert.assertTrue(expiredMessages.isEmpty());
+
+ // Go past m1 expiry time:
+ clock.advance(8, TimeUnit.SECONDS);
+
+ // Add a new message and verify both m1 and m2 have been expired:
+ Message<?> m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10));
+ queue.add(m3);
+ Assert.assertEquals(m2, expiredMessages.remove(0));
+ Assert.assertEquals(m1, expiredMessages.remove(0));
+
+ // New expiration deadline is m3:
+ Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+
+ // Go past m3 expiry time:
+ clock.advance(4, TimeUnit.SECONDS);
+
+ try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {}))
+ {
+ // Add a new message and verify nothing is expired because the lock is held by this iteration:
+ Message<?> m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(15));
+ queue.add(m4);
+ Assert.assertTrue(expiredMessages.isEmpty());
+
+ // Also the deadline didn't change, even though we're past the m3 expiry time: this way we're sure the
+ // pruner will run promptly even if falling behind during iteration.
+ Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+ }
+
+ // Check post iteration m3 has expired:
+ Assert.assertEquals(m3, expiredMessages.remove(0));
+ // And deadline is now m4:
+ Assert.assertEquals(15, queue.nextExpirationIn(startTime, TimeUnit.SECONDS));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org