You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/06/15 21:35:26 UTC

[qpid-protonj2] branch main updated: PROTON-2564 Avoid system calls for waiter notification if none waiting

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f08d239 PROTON-2564 Avoid system calls for waiter notification if none waiting
2f08d239 is described below

commit 2f08d239caa603e88db5e6ffc9095b744fcb32cc
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Jun 15 17:09:39 2022 -0400

    PROTON-2564 Avoid system calls for waiter notification if none waiting
    
    Reduces overhead when no waiters are present and deliveries are being
    queued into the prefetch buffer.
---
 .../protonj2/client/util/FifoDeliveryQueue.java    | 37 +++++++++++++++++-----
 1 file changed, 29 insertions(+), 8 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 91fbaebe..2bbb8f40 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -18,7 +18,6 @@ package org.apache.qpid.protonj2.client.util;
 
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.qpid.protonj2.client.Delivery;
@@ -38,6 +37,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
 
     private volatile int state = STOPPED;
 
+    private int waiters = 0;
+
     private final Deque<ClientDelivery> queue;
 
     /**
@@ -54,7 +55,9 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     public void enqueueFirst(ClientDelivery envelope) {
         synchronized (queue) {
             queue.addFirst(envelope);
-            queue.notify();
+            if (waiters > 0) {
+                queue.notify();
+            }
         }
     }
 
@@ -62,7 +65,9 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     public void enqueue(ClientDelivery envelope) {
         synchronized (queue) {
             queue.addLast(envelope);
-            queue.notify();
+            if (waiters > 0) {
+                queue.notify();
+            }
         }
     }
 
@@ -72,10 +77,20 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
             // Wait until the receiver is ready to deliver messages.
             while (timeout != 0 && isRunning() && queue.isEmpty()) {
                 if (timeout == -1) {
-                    queue.wait();
+                    waiters++;
+                    try {
+                        queue.wait();
+                    } finally {
+                        waiters--;
+                    }
                 } else {
                     long start = System.currentTimeMillis();
-                    queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout));
+                    waiters++;
+                    try {
+                        queue.wait(timeout);
+                    } finally {
+                        waiters--;
+                    }
                     timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
                 }
             }
@@ -103,7 +118,9 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     public void start() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }
@@ -112,7 +129,9 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     public void stop() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }
@@ -121,7 +140,9 @@ public final class FifoDeliveryQueue implements DeliveryQueue {
     public void close() {
         if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org