You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/03/27 17:41:58 UTC

[qpid-protonj2] branch main updated: PROTON-2697 Resolve exception if receive timeout is less than minus one

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e74916d PROTON-2697 Resolve exception if receive timeout is less than minus one
8e74916d is described below

commit 8e74916dac8455fa711a75d8f4cfab827e6f4d78
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Mon Mar 27 13:04:23 2023 -0400

    PROTON-2697 Resolve exception if receive timeout is less than minus one
    
    Accept all negative values as infinite wait to allow for unit
    conversions that result in values lower than minus one causing an
    exception on thread wait.
---
 .../org/apache/qpid/protonj2/client/Receiver.java  | 12 ++---
 .../qpid/protonj2/client/StreamReceiver.java       | 12 ++---
 .../qpid/protonj2/client/impl/ClientReceiver.java  |  2 +-
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 57 +++++++++++++++++++++
 .../protonj2/client/impl/StreamReceiverTest.java   | 58 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 13 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
index ba6cefd4..dcbf2cf1 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
@@ -64,14 +64,14 @@ public interface Receiver extends Link<Receiver> {
 
     /**
      * Blocking receive method that waits the given time interval for the remote to provide a
-     * {@link Delivery} for consumption.  The amount of time this method blocks is based on the
-     * timeout value. If timeout is equal to <code>-1</code> then it blocks until a Delivery is
-     * received. If timeout is equal to zero then it will not block and simply return a
-     * {@link Delivery} if one is available locally.  If timeout value is greater than zero then it
-     * blocks up to timeout amount of time.
+     * {@link Delivery} for consumption. The amount of time this method blocks is based on the
+     * timeout value. If the timeout is less than zero then it blocks until a Delivery is received.
+     * If the timeout is equal to zero then it will not block and simply return a {@link Delivery}
+     * if one is available locally. If the timeout value is greater than zero then it blocks up to
+     * timeout amount of time.
      * <p>
      * Receive calls will only grant credit on their own if a credit window is configured in the
-     * {@link ReceiverOptions} which is done by default.  If the client application has not configured
+     * {@link ReceiverOptions} which is done by default. If the client application has not configured
      * a credit window or granted credit manually this method will not automatically grant any credit
      * when it enters the wait for a new incoming {@link Delivery}.
      *
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
index 3de8aa76..570d281a 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
@@ -71,14 +71,14 @@ public interface StreamReceiver extends Link<StreamReceiver> {
 
     /**
      * Blocking receive method that waits the given time interval for the remote to provide a
-     * {@link StreamReceiverMessage} for consumption.  The amount of time this method blocks is based on the
-     * timeout value. If timeout is equal to <code>-1</code> then it blocks until a Delivery is
-     * received. If timeout is equal to zero then it will not block and simply return a
-     * {@link StreamReceiverMessage} if one is available locally.  If timeout value is greater than zero then it
-     * blocks up to timeout amount of time.
+     * {@link StreamReceiverMessage} for consumption. The amount of time this method blocks is based on
+     * the timeout value. If the timeout is negative then it blocks until a Delivery is received. If the
+     * timeout is equal to zero then it will not block and simply return a {@link StreamReceiverMessage}
+     * if one is available locally. If the timeout value is greater than zero then it blocks up to timeout
+     * amount of time.
      * <p>
      * Receive calls will only grant credit on their own if a credit window is configured in the
-     * {@link StreamReceiverOptions} which is done by default.  If the client application has not configured
+     * {@link StreamReceiverOptions} which is done by default. If the client application has not configured
      * a credit window or granted credit manually this method will not automatically grant any credit
      * when it enters the wait for a new incoming {@link StreamReceiverMessage}.
      *
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 1f7cafd4..82d38671 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -67,7 +67,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         checkClosedOrFailed();
 
         try {
-            ClientDelivery delivery = deliveryQueue.dequeue(units.toMillis(timeout));
+            ClientDelivery delivery = deliveryQueue.dequeue(Math.max(-1, units.toMillis(timeout)));
             if (delivery != null) {
                 if (options.autoAccept()) {
                     disposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 9c7adcc4..23ab5dc9 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2989,4 +2989,61 @@ public class ReceiverTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiveAcceptsTimeoutAndWaitsForDelivery() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(1, TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneMillisends() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneSeconds() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, TimeUnit.SECONDS);
+    }
+
+    public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long timeout, TimeUnit units) throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).afterDelay(25).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            final Receiver receiver = connection.openReceiver("test-queue");
+            final Delivery delivery = receiver.receive(timeout, units);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 2ca78f0a..9fd3faa2 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -3860,6 +3860,64 @@ class StreamReceiverTest extends ImperativeClientTestCase {
        }
     }
 
+    @Test
+    public void testReceiveAcceptsTimeoutAndWaitsForDelivery() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(1, TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneMillisends() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneSeconds() throws Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, TimeUnit.SECONDS);
+    }
+
+    public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long timeout, TimeUnit units) throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).afterDelay(25).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            final StreamReceiver receiver = connection.openStreamReceiver("test-queue");
+            final StreamDelivery delivery = receiver.receive(timeout, units);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     private byte[] createInvalidHeaderEncoding() {
         final byte[] buffer = new byte[12];
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org