You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/09/06 21:24:10 UTC

[qpid-protonj2] branch main updated: PROTON-2601 Allow for configuration of a custom delivery tag generator

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new afaef524 PROTON-2601 Allow for configuration of a custom delivery tag generator
afaef524 is described below

commit afaef5244bfb7b1b06a271f4c62da76c13f140ad
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Tue Sep 6 17:06:23 2022 -0400

    PROTON-2601 Allow for configuration of a custom delivery tag generator
    
    Allows the SenderOptions and StreamSenderOptions to convey a supplier to
    the sender which create a DeliveryTagGenerator for each sender upon
    creation.
---
 .../apache/qpid/protonj2/client/SenderOptions.java |  34 +++++++
 .../qpid/protonj2/client/StreamSenderOptions.java  |  35 ++++++-
 .../protonj2/client/impl/ClientSenderBuilder.java  |  32 ++++--
 .../qpid/protonj2/client/impl/SenderTest.java      | 109 +++++++++++++++++++++
 .../protonj2/client/impl/StreamSenderTest.java     | 108 ++++++++++++++++++++
 .../engine/impl/ProtonSequentialTagGenerator.java  |   5 -
 .../engine/impl/ProtonUuidTagGenerator.java        |   5 -
 .../apache/qpid/protonj2/types/DeliveryTag.java    |  12 +--
 8 files changed, 313 insertions(+), 27 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
index 77c4990c..6d653d6d 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
@@ -17,8 +17,10 @@
 package org.apache.qpid.protonj2.client;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
 
 /**
  * Options that control the behavior of a {@link Sender} created from them.
@@ -27,6 +29,8 @@ public class SenderOptions extends LinkOptions<SenderOptions> implements Cloneab
 
     private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
 
+    private Supplier<DeliveryTagGenerator> tagGeneratorSupplier;
+
     /**
      * Create a new {@link SenderOptions} instance configured with default configuration settings.
      */
@@ -103,10 +107,40 @@ public class SenderOptions extends LinkOptions<SenderOptions> implements Cloneab
         super.copyInto(other);
 
         other.sendTimeout(sendTimeout);
+        other.deliveryTagGeneratorSupplier(tagGeneratorSupplier);
 
         return other;
     }
 
+    /**
+     * Configures a {@link Supplier} which provides unique instances of {@link DeliveryTagGenerator} objects
+     * for any {@link Sender} created using these options.
+     * <p>
+     * The client sender will use a default {@link DeliveryTagGenerator} under normal circumstances and the
+     * user is not required to configure a {@link Supplier}. In some cases where the user is communicating
+     * with a system that requires a specific format of delivery tag this option allows use of a custom
+     * generator. The caller is responsible for providing a supplier that will create a unique instance of
+     * a tag generator as they are not meant to be shared amongst senders. Once a sender has been created
+     * the tag generator it uses cannot be changed so future calls to this method will not affect previously
+     * created {@link Sender} instances.
+     *
+     * @param supplier
+     * 		The {@link Supplier} of {@link DeliveryTagGenerator} instances.
+     *
+     * @return the {@link SenderOptions} instance that was given.
+     */
+    public SenderOptions deliveryTagGeneratorSupplier(Supplier<DeliveryTagGenerator> supplier) {
+        this.tagGeneratorSupplier = supplier;
+        return this;
+    }
+
+    /**
+     * @return the configured delivery tag {@link Supplier} or null if none was set.
+     */
+    public Supplier<DeliveryTagGenerator> deliveryTagGeneratorSupplier() {
+        return tagGeneratorSupplier;
+    }
+
     @Override
     protected SenderOptions self() {
         return this;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
index 4040ef54..21bee444 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
@@ -17,8 +17,10 @@
 package org.apache.qpid.protonj2.client;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
 
 /**
  * Options class that controls various aspects of a {@link StreamSenderMessage} instance and how
@@ -37,6 +39,8 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
 
     private int pendingWritesBufferSize = DEFAULT_PENDING_WRITES_BUFFER_SIZE;
 
+    private Supplier<DeliveryTagGenerator> tagGeneratorSupplier;
+
     /**
      * Defines the default minimum size that the context write buffer will allocate
      * which drives the interval auto flushing of written data for this context.
@@ -157,7 +161,7 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
 
     /**
      * Configures the timeout used when awaiting a send operation to complete.  A send will block if the
-     * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
+     * remote has not granted the {@link StreamSender} or the {@link Session} credit to do so, if the send blocks
      * for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
      * exception to indicate that the send did not complete.
      *
@@ -173,6 +177,35 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
         return this;
     }
 
+    /**
+     * Configures a {@link Supplier} which provides unique instances of {@link DeliveryTagGenerator} objects
+     * for any {@link StreamSender} created using these options.
+     * <p>
+     * The client sender will use a default {@link DeliveryTagGenerator} under normal circumstances and the
+     * user is not required to configure a {@link Supplier}. In some cases where the user is communicating
+     * with a system that requires a specific format of delivery tag this option allows use of a custom
+     * generator. The caller is responsible for providing a supplier that will create a unique instance of
+     * a tag generator as they are not meant to be shared amongst senders. Once a sender has been created
+     * the tag generator it uses cannot be changed so future calls to this method will not affect previously
+     * created {@link StreamSender} instances.
+     *
+     * @param supplier
+     * 		The {@link Supplier} of {@link DeliveryTagGenerator} instances.
+     *
+     * @return the {@link StreamSenderOptions} instance that was given.
+     */
+    public StreamSenderOptions deliveryTagGeneratorSupplier(Supplier<DeliveryTagGenerator> supplier) {
+        this.tagGeneratorSupplier = supplier;
+        return this;
+    }
+
+    /**
+     * @return the configured delivery tag {@link Supplier} or null if none was set.
+     */
+    public Supplier<DeliveryTagGenerator> deliveryTagGeneratorSupplier() {
+        return tagGeneratorSupplier;
+    }
+
     @Override
     protected StreamSenderOptions self() {
         return this;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
index 62f4a0dc..850cc690 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.protonj2.client.LinkOptions;
@@ -25,6 +26,7 @@ import org.apache.qpid.protonj2.client.SourceOptions;
 import org.apache.qpid.protonj2.client.StreamSenderOptions;
 import org.apache.qpid.protonj2.client.TargetOptions;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
 import org.apache.qpid.protonj2.engine.Sender;
 import org.apache.qpid.protonj2.engine.Session;
 import org.apache.qpid.protonj2.engine.impl.ProtonDeliveryTagGenerator;
@@ -58,7 +60,10 @@ final class ClientSenderBuilder {
     public ClientSender sender(String address, SenderOptions senderOptions) throws ClientException {
         final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
         final String senderId = nextSenderId();
-        final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
+        final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+            Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+        final Sender protonSender = createSender(
+            session.getProtonSession(), address, options, senderId, tagGenerator);
 
         return new ClientSender(session, options, senderId, protonSender);
     }
@@ -66,7 +71,10 @@ final class ClientSenderBuilder {
     public ClientSender anonymousSender(SenderOptions senderOptions) throws ClientException {
         final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
         final String senderId = nextSenderId();
-        final Sender protonSender = createSender(session.getProtonSession(), null, options, senderId);
+        final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+            Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+        final Sender protonSender = createSender(
+            session.getProtonSession(), null, options, senderId, tagGenerator);
 
         return new ClientSender(session, options, senderId, protonSender);
     }
@@ -74,12 +82,16 @@ final class ClientSenderBuilder {
     public ClientStreamSender streamSender(String address, StreamSenderOptions senderOptions) throws ClientException {
         final StreamSenderOptions options = senderOptions != null ? senderOptions : getDefaultStreamSenderOptions();
         final String senderId = nextSenderId();
-        final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
+        final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+            Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+        final Sender protonSender = createSender(
+            session.getProtonSession(), address, options, senderId, tagGenerator);
 
         return new ClientStreamSender(session, options, senderId, protonSender);
     }
 
-    private static Sender createSender(Session protonSession, String address, LinkOptions<?> options, String senderId) {
+    private static Sender createSender(Session protonSession, String address, LinkOptions<?> options,
+                                       String senderId, DeliveryTagGenerator tagGenerator) {
         final String linkName;
 
         if (options.linkName() != null) {
@@ -107,11 +119,15 @@ final class ClientSenderBuilder {
         protonSender.setTarget(createTarget(address, options));
         protonSender.setSource(createSource(senderId, options));
 
-        // Use a tag generator that will reuse old tags.  Later we might make this configurable.
-        if (protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED) {
-            protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.EMPTY.createGenerator());
+        if (tagGenerator == null) {
+            // Use a tag generator that will reuse old tags if not sending settled.
+            if (protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED) {
+                protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.EMPTY.createGenerator());
+            } else {
+                protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
+            }
         } else {
-            protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
+            protonSender.setDeliveryTagGenerator(tagGenerator);
         }
 
         return protonSender;
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index b946fcca..1f7f86bd 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -67,6 +67,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedEx
 import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
@@ -74,6 +75,7 @@ import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.protonj2.types.DeliveryTag;
 import org.apache.qpid.protonj2.types.transport.AmqpError;
 import org.apache.qpid.protonj2.types.transport.LinkError;
 import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
@@ -2561,4 +2563,111 @@ public class SenderTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    private static DeliveryTagGenerator customTagGenerator() {
+        return new DeliveryTagGenerator() {
+
+            private int count = 1;
+
+            @Override
+            public DeliveryTag nextTag() {
+                switch (count++) {
+                    case 1:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
+                    case 2:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
+                    case 3:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
+                    default:
+                        throw new UnsupportedOperationException("Only supports creating three tags");
+                }
+            }
+        };
+    }
+
+    @Test
+    public void testSendeUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(10).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+            Session session = connection.openSession().openFuture().get();
+            SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+                                                       .autoSettle(false)
+                                                       .deliveryTagGeneratorSupplier(SenderTest::customTagGenerator);
+            Sender sender = session.openSender("test-tags", options).openFuture().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            final Message<String> message = Message.create("Hello World");
+            final Tracker tracker1 = sender.send(message);
+            final Tracker tracker2 = sender.send(message);
+            final Tracker tracker3 = sender.send(message);
+
+            assertNotNull(tracker1);
+            assertNotNull(tracker1.settlementFuture().get().settled());
+            assertNotNull(tracker2);
+            assertNotNull(tracker2.settlementFuture().get().settled());
+            assertNotNull(tracker3);
+            assertNotNull(tracker3.settlementFuture().get().settled());
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+            Session session = connection.openSession().openFuture().get();
+            SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+                                                       .autoSettle(false)
+                                                       .deliveryTagGeneratorSupplier(() -> null);
+            try {
+                session.openSender("test-tags", options).openFuture().get();
+                fail("Should not create a sender if the tag generator is not supplied");
+            } catch (ClientException cliEx) {
+                // Expected
+            }
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 5d0b7a84..65a8189a 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -62,6 +62,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.client.test.Wait;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
@@ -74,6 +75,7 @@ import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatch
 import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedCompositingDataSectionMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedPartialDataSectionMatcher;
+import org.apache.qpid.protonj2.types.DeliveryTag;
 import org.apache.qpid.protonj2.types.messaging.AmqpValue;
 import org.apache.qpid.protonj2.types.messaging.Data;
 import org.apache.qpid.protonj2.types.messaging.Footer;
@@ -2788,4 +2790,110 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    private static DeliveryTagGenerator customTagGenerator() {
+        return new DeliveryTagGenerator() {
+
+            private int count = 1;
+
+            @Override
+            public DeliveryTag nextTag() {
+                switch (count++) {
+                    case 1:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
+                    case 2:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
+                    case 3:
+                        return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
+                    default:
+                        throw new UnsupportedOperationException("Only supports creating three tags");
+                }
+            }
+        };
+    }
+
+    @Test
+    public void testSendeUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond(); // Hidden session for stream sender
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(10).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+            StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+                                                                   .autoSettle(false)
+                                                                   .deliveryTagGeneratorSupplier(StreamSenderTest::customTagGenerator);
+            StreamSender sender = connection.openStreamSender("test-tags", options).openFuture().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
+            peer.expectTransfer().withNonNullPayload()
+                                 .withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
+            peer.expectDetach().respond();
+            peer.expectEnd().respond(); // From hidden stream sender session
+            peer.expectClose().respond();
+
+            final Message<String> message = Message.create("Hello World");
+            final StreamTracker tracker1 = sender.send(message);
+            final StreamTracker tracker2 = sender.send(message);
+            final StreamTracker tracker3 = sender.send(message);
+
+            assertNotNull(tracker1);
+            assertNotNull(tracker1.settlementFuture().get().settled());
+            assertNotNull(tracker2);
+            assertNotNull(tracker2.settlementFuture().get().settled());
+            assertNotNull(tracker3);
+            assertNotNull(tracker3.settlementFuture().get().settled());
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond(); // Hidden session for stream sender
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+            StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+                                                                   .autoSettle(false)
+                                                                   .deliveryTagGeneratorSupplier(() -> null);
+            try {
+                connection.openStreamSender("test-tags", options).openFuture().get();
+                fail("Should not create a sender if the tag generator is not supplied");
+            } catch (ClientException cliEx) {
+                // Expected
+            }
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
index e38833b4..35876f32 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
@@ -84,11 +84,6 @@ public class ProtonSequentialTagGenerator extends ProtonDeliveryTagGenerator {
             return ProtonByteBufferAllocator.DEFAULT.wrap(tagBytes());
         }
 
-        @Override
-        public void release() {
-            // Nothing to do in this implementation
-        }
-
         @Override
         public DeliveryTag copy() {
             return new ProtonNumericDeliveryTag(tagValue);
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
index f9c8df2b..3bdad7a5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
@@ -65,11 +65,6 @@ public class ProtonUuidTagGenerator extends ProtonDeliveryTagGenerator {
             return ProtonByteBufferAllocator.DEFAULT.wrap(tagBytes());
         }
 
-        @Override
-        public void release() {
-            // Nothing to do for this tag implementation.
-        }
-
         @Override
         public DeliveryTag copy() {
             return new ProtonUuidDeliveryTag(tagValue);
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
index 08222278..227297b1 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
@@ -58,10 +58,11 @@ public interface DeliveryTag {
     ProtonBuffer tagBuffer();
 
     /**
-     * Optional method used by tag implementations that provide pooling of tags.  Implementations can
-     * do nothing here if no release mechanics are needed.
+     * Optional method used by tag implementations that provide pooling of tags.
      */
-    void release();
+    default void release() {
+        // Default is no action, a subclass can add an action if needed.
+    }
 
     /**
      * Create a copy of this delivery tag, the copy should account for any underlying pooling of tags that
@@ -138,11 +139,6 @@ public interface DeliveryTag {
             return new ProtonDeliveryTag(Arrays.copyOf(tagBytes, tagBytes.length));
         }
 
-        @Override
-        public void release() {
-            // Nothing to do for this basic implementation.
-        }
-
         @Override
         public int hashCode() {
             if (hashCode == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org