You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/17 18:33:42 UTC
qpid-jms git commit: QPIDJMS-420 Avoid signaling attempt costs when
no waiters
Repository: qpid-jms
Updated Branches:
refs/heads/master 59884aa60 -> 72bbc8060
QPIDJMS-420 Avoid signaling attempt costs when no waiters
Track if a receive (dequeue) call is waiting on the prefetch queue in
the consumer and if none present don't try to wait a thread which avoids
some cost and reduces time under lock.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/72bbc806
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/72bbc806
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/72bbc806
Branch: refs/heads/master
Commit: 72bbc8060bd724baf844733454d3f90844a50e5f
Parents: 59884aa
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 17 14:33:31 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 17 14:33:31 2018 -0400
----------------------------------------------------------------------
.../qpid/jms/util/AbstractMessageQueue.java | 21 +++++++++++++++++---
.../apache/qpid/jms/util/FifoMessageQueue.java | 10 +++++++---
.../qpid/jms/util/PriorityMessageQueue.java | 8 ++++++--
3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
index edf8a48..a3dda4b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
@@ -28,6 +28,8 @@ public abstract class AbstractMessageQueue implements MessageQueue {
private volatile boolean running;
private final Object lock = new Object();
+ private int waiters;
+
@Override
public final JmsInboundMessageDispatch peek() {
synchronized (lock) {
@@ -40,6 +42,7 @@ public abstract class AbstractMessageQueue implements MessageQueue {
synchronized (lock) {
// Wait until the consumer is ready to deliver messages.
while (timeout != 0 && !closed && isEmpty() && running) {
+ waiters++;
if (timeout == -1) {
lock.wait();
} else {
@@ -47,6 +50,7 @@ public abstract class AbstractMessageQueue implements MessageQueue {
lock.wait(timeout);
timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
}
+ waiters--;
}
if (closed || !running || isEmpty()) {
@@ -73,7 +77,10 @@ public abstract class AbstractMessageQueue implements MessageQueue {
if (!closed) {
running = true;
}
- lock.notifyAll();
+
+ if (hasWaiters()) {
+ lock.notifyAll();
+ }
}
}
@@ -81,7 +88,9 @@ public abstract class AbstractMessageQueue implements MessageQueue {
public final void stop() {
synchronized (lock) {
running = false;
- lock.notifyAll();
+ if (hasWaiters()) {
+ lock.notifyAll();
+ }
}
}
@@ -95,7 +104,9 @@ public abstract class AbstractMessageQueue implements MessageQueue {
synchronized (lock) {
running = false;
closed = true;
- lock.notifyAll();
+ if (hasWaiters()) {
+ lock.notifyAll();
+ }
}
}
@@ -109,6 +120,10 @@ public abstract class AbstractMessageQueue implements MessageQueue {
return lock;
}
+ protected boolean hasWaiters() {
+ return waiters > 0;
+ }
+
/**
* Removes and returns the first entry in the implementation queue. This method
* is always called under lock and does not need to protect itself or check running
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
index a6860d1..40f3961 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -31,14 +31,16 @@ public final class FifoMessageQueue extends AbstractMessageQueue {
protected final Deque<JmsInboundMessageDispatch> queue;
public FifoMessageQueue(int prefetchSize) {
- this.queue = new ArrayDeque<JmsInboundMessageDispatch>(prefetchSize);
+ this.queue = new ArrayDeque<JmsInboundMessageDispatch>(Math.max(1, prefetchSize));
}
@Override
public void enqueueFirst(JmsInboundMessageDispatch envelope) {
synchronized (getLock()) {
queue.addFirst(envelope);
- getLock().notify();
+ if (hasWaiters()) {
+ getLock().notify();
+ }
}
}
@@ -46,7 +48,9 @@ public final class FifoMessageQueue extends AbstractMessageQueue {
public void enqueue(JmsInboundMessageDispatch envelope) {
synchronized (getLock()) {
queue.addLast(envelope);
- getLock().notify();
+ if (hasWaiters()) {
+ getLock().notify();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
index 5b4bdb1..88b7cf4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
@@ -50,7 +50,9 @@ public final class PriorityMessageQueue extends AbstractMessageQueue {
synchronized (getLock()) {
getList(envelope).addLast(envelope);
this.size++;
- getLock().notify();
+ if (hasWaiters()) {
+ getLock().notify();
+ }
}
}
@@ -59,7 +61,9 @@ public final class PriorityMessageQueue extends AbstractMessageQueue {
synchronized (getLock()) {
getList(MAX_PRIORITY).addFirst(envelope);
this.size++;
- getLock().notify();
+ if (hasWaiters()) {
+ getLock().notify();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org