You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/17 18:33:42 UTC

qpid-jms git commit: QPIDJMS-420 Avoid signaling attempt costs when no waiters

Repository: qpid-jms
Updated Branches:
  refs/heads/master 59884aa60 -> 72bbc8060


QPIDJMS-420 Avoid signaling attempt costs when no waiters

Track if a receive (dequeue) call is waiting on the prefetch queue in
the consumer and if none present don't try to wait a thread which avoids
some cost and reduces time under lock.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/72bbc806
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/72bbc806
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/72bbc806

Branch: refs/heads/master
Commit: 72bbc8060bd724baf844733454d3f90844a50e5f
Parents: 59884aa
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 17 14:33:31 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 17 14:33:31 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/util/AbstractMessageQueue.java     | 21 +++++++++++++++++---
 .../apache/qpid/jms/util/FifoMessageQueue.java  | 10 +++++++---
 .../qpid/jms/util/PriorityMessageQueue.java     |  8 ++++++--
 3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
index edf8a48..a3dda4b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
@@ -28,6 +28,8 @@ public abstract class AbstractMessageQueue implements MessageQueue {
     private volatile boolean running;
     private final Object lock = new Object();
 
+    private int waiters;
+
     @Override
     public final JmsInboundMessageDispatch peek() {
         synchronized (lock) {
@@ -40,6 +42,7 @@ public abstract class AbstractMessageQueue implements MessageQueue {
         synchronized (lock) {
             // Wait until the consumer is ready to deliver messages.
             while (timeout != 0 && !closed && isEmpty() && running) {
+                waiters++;
                 if (timeout == -1) {
                     lock.wait();
                 } else {
@@ -47,6 +50,7 @@ public abstract class AbstractMessageQueue implements MessageQueue {
                     lock.wait(timeout);
                     timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
                 }
+                waiters--;
             }
 
             if (closed || !running || isEmpty()) {
@@ -73,7 +77,10 @@ public abstract class AbstractMessageQueue implements MessageQueue {
             if (!closed) {
                 running = true;
             }
-            lock.notifyAll();
+
+            if (hasWaiters()) {
+                lock.notifyAll();
+            }
         }
     }
 
@@ -81,7 +88,9 @@ public abstract class AbstractMessageQueue implements MessageQueue {
     public final void stop() {
         synchronized (lock) {
             running = false;
-            lock.notifyAll();
+            if (hasWaiters()) {
+                lock.notifyAll();
+            }
         }
     }
 
@@ -95,7 +104,9 @@ public abstract class AbstractMessageQueue implements MessageQueue {
         synchronized (lock) {
             running = false;
             closed = true;
-            lock.notifyAll();
+            if (hasWaiters()) {
+                lock.notifyAll();
+            }
         }
     }
 
@@ -109,6 +120,10 @@ public abstract class AbstractMessageQueue implements MessageQueue {
         return lock;
     }
 
+    protected boolean hasWaiters() {
+        return waiters > 0;
+    }
+
     /**
      * Removes and returns the first entry in the implementation queue.  This method
      * is always called under lock and does not need to protect itself or check running

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
index a6860d1..40f3961 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -31,14 +31,16 @@ public final class FifoMessageQueue extends AbstractMessageQueue {
     protected final Deque<JmsInboundMessageDispatch> queue;
 
     public FifoMessageQueue(int prefetchSize) {
-        this.queue = new ArrayDeque<JmsInboundMessageDispatch>(prefetchSize);
+        this.queue = new ArrayDeque<JmsInboundMessageDispatch>(Math.max(1, prefetchSize));
     }
 
     @Override
     public void enqueueFirst(JmsInboundMessageDispatch envelope) {
         synchronized (getLock()) {
             queue.addFirst(envelope);
-            getLock().notify();
+            if (hasWaiters()) {
+                getLock().notify();
+            }
         }
     }
 
@@ -46,7 +48,9 @@ public final class FifoMessageQueue extends AbstractMessageQueue {
     public void enqueue(JmsInboundMessageDispatch envelope) {
         synchronized (getLock()) {
             queue.addLast(envelope);
-            getLock().notify();
+            if (hasWaiters()) {
+                getLock().notify();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72bbc806/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
index 5b4bdb1..88b7cf4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
@@ -50,7 +50,9 @@ public final class PriorityMessageQueue extends AbstractMessageQueue {
         synchronized (getLock()) {
             getList(envelope).addLast(envelope);
             this.size++;
-            getLock().notify();
+            if (hasWaiters()) {
+                getLock().notify();
+            }
         }
     }
 
@@ -59,7 +61,9 @@ public final class PriorityMessageQueue extends AbstractMessageQueue {
         synchronized (getLock()) {
             getList(MAX_PRIORITY).addFirst(envelope);
             this.size++;
-            getLock().notify();
+            if (hasWaiters()) {
+                getLock().notify();
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org