You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/09/06 21:24:10 UTC
[qpid-protonj2] branch main updated: PROTON-2601 Allow for configuration of a custom delivery tag generator
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new afaef524 PROTON-2601 Allow for configuration of a custom delivery tag generator
afaef524 is described below
commit afaef5244bfb7b1b06a271f4c62da76c13f140ad
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Tue Sep 6 17:06:23 2022 -0400
PROTON-2601 Allow for configuration of a custom delivery tag generator
Allows the SenderOptions and StreamSenderOptions to convey a supplier to
the sender which create a DeliveryTagGenerator for each sender upon
creation.
---
.../apache/qpid/protonj2/client/SenderOptions.java | 34 +++++++
.../qpid/protonj2/client/StreamSenderOptions.java | 35 ++++++-
.../protonj2/client/impl/ClientSenderBuilder.java | 32 ++++--
.../qpid/protonj2/client/impl/SenderTest.java | 109 +++++++++++++++++++++
.../protonj2/client/impl/StreamSenderTest.java | 108 ++++++++++++++++++++
.../engine/impl/ProtonSequentialTagGenerator.java | 5 -
.../engine/impl/ProtonUuidTagGenerator.java | 5 -
.../apache/qpid/protonj2/types/DeliveryTag.java | 12 +--
8 files changed, 313 insertions(+), 27 deletions(-)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
index 77c4990c..6d653d6d 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
@@ -17,8 +17,10 @@
package org.apache.qpid.protonj2.client;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
/**
* Options that control the behavior of a {@link Sender} created from them.
@@ -27,6 +29,8 @@ public class SenderOptions extends LinkOptions<SenderOptions> implements Cloneab
private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
+ private Supplier<DeliveryTagGenerator> tagGeneratorSupplier;
+
/**
* Create a new {@link SenderOptions} instance configured with default configuration settings.
*/
@@ -103,10 +107,40 @@ public class SenderOptions extends LinkOptions<SenderOptions> implements Cloneab
super.copyInto(other);
other.sendTimeout(sendTimeout);
+ other.deliveryTagGeneratorSupplier(tagGeneratorSupplier);
return other;
}
+ /**
+ * Configures a {@link Supplier} which provides unique instances of {@link DeliveryTagGenerator} objects
+ * for any {@link Sender} created using these options.
+ * <p>
+ * The client sender will use a default {@link DeliveryTagGenerator} under normal circumstances and the
+ * user is not required to configure a {@link Supplier}. In some cases where the user is communicating
+ * with a system that requires a specific format of delivery tag this option allows use of a custom
+ * generator. The caller is responsible for providing a supplier that will create a unique instance of
+ * a tag generator as they are not meant to be shared amongst senders. Once a sender has been created
+ * the tag generator it uses cannot be changed so future calls to this method will not affect previously
+ * created {@link Sender} instances.
+ *
+ * @param supplier
+ * The {@link Supplier} of {@link DeliveryTagGenerator} instances.
+ *
+ * @return the {@link SenderOptions} instance that was given.
+ */
+ public SenderOptions deliveryTagGeneratorSupplier(Supplier<DeliveryTagGenerator> supplier) {
+ this.tagGeneratorSupplier = supplier;
+ return this;
+ }
+
+ /**
+ * @return the configured delivery tag {@link Supplier} or null if none was set.
+ */
+ public Supplier<DeliveryTagGenerator> deliveryTagGeneratorSupplier() {
+ return tagGeneratorSupplier;
+ }
+
@Override
protected SenderOptions self() {
return this;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
index 4040ef54..21bee444 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
@@ -17,8 +17,10 @@
package org.apache.qpid.protonj2.client;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
/**
* Options class that controls various aspects of a {@link StreamSenderMessage} instance and how
@@ -37,6 +39,8 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
private int pendingWritesBufferSize = DEFAULT_PENDING_WRITES_BUFFER_SIZE;
+ private Supplier<DeliveryTagGenerator> tagGeneratorSupplier;
+
/**
* Defines the default minimum size that the context write buffer will allocate
* which drives the interval auto flushing of written data for this context.
@@ -157,7 +161,7 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
/**
* Configures the timeout used when awaiting a send operation to complete. A send will block if the
- * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
+ * remote has not granted the {@link StreamSender} or the {@link Session} credit to do so, if the send blocks
* for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
* exception to indicate that the send did not complete.
*
@@ -173,6 +177,35 @@ public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> implem
return this;
}
+ /**
+ * Configures a {@link Supplier} which provides unique instances of {@link DeliveryTagGenerator} objects
+ * for any {@link StreamSender} created using these options.
+ * <p>
+ * The client sender will use a default {@link DeliveryTagGenerator} under normal circumstances and the
+ * user is not required to configure a {@link Supplier}. In some cases where the user is communicating
+ * with a system that requires a specific format of delivery tag this option allows use of a custom
+ * generator. The caller is responsible for providing a supplier that will create a unique instance of
+ * a tag generator as they are not meant to be shared amongst senders. Once a sender has been created
+ * the tag generator it uses cannot be changed so future calls to this method will not affect previously
+ * created {@link StreamSender} instances.
+ *
+ * @param supplier
+ * The {@link Supplier} of {@link DeliveryTagGenerator} instances.
+ *
+ * @return the {@link StreamSenderOptions} instance that was given.
+ */
+ public StreamSenderOptions deliveryTagGeneratorSupplier(Supplier<DeliveryTagGenerator> supplier) {
+ this.tagGeneratorSupplier = supplier;
+ return this;
+ }
+
+ /**
+ * @return the configured delivery tag {@link Supplier} or null if none was set.
+ */
+ public Supplier<DeliveryTagGenerator> deliveryTagGeneratorSupplier() {
+ return tagGeneratorSupplier;
+ }
+
@Override
protected StreamSenderOptions self() {
return this;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
index 62f4a0dc..850cc690 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.protonj2.client.LinkOptions;
@@ -25,6 +26,7 @@ import org.apache.qpid.protonj2.client.SourceOptions;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
import org.apache.qpid.protonj2.client.TargetOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.impl.ProtonDeliveryTagGenerator;
@@ -58,7 +60,10 @@ final class ClientSenderBuilder {
public ClientSender sender(String address, SenderOptions senderOptions) throws ClientException {
final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
final String senderId = nextSenderId();
- final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
+ final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+ Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+ final Sender protonSender = createSender(
+ session.getProtonSession(), address, options, senderId, tagGenerator);
return new ClientSender(session, options, senderId, protonSender);
}
@@ -66,7 +71,10 @@ final class ClientSenderBuilder {
public ClientSender anonymousSender(SenderOptions senderOptions) throws ClientException {
final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
final String senderId = nextSenderId();
- final Sender protonSender = createSender(session.getProtonSession(), null, options, senderId);
+ final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+ Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+ final Sender protonSender = createSender(
+ session.getProtonSession(), null, options, senderId, tagGenerator);
return new ClientSender(session, options, senderId, protonSender);
}
@@ -74,12 +82,16 @@ final class ClientSenderBuilder {
public ClientStreamSender streamSender(String address, StreamSenderOptions senderOptions) throws ClientException {
final StreamSenderOptions options = senderOptions != null ? senderOptions : getDefaultStreamSenderOptions();
final String senderId = nextSenderId();
- final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
+ final DeliveryTagGenerator tagGenerator = options.deliveryTagGeneratorSupplier() == null ? null :
+ Objects.requireNonNull(options.deliveryTagGeneratorSupplier().get(), "Cannot assign a null tag generator from a custom supplier");
+ final Sender protonSender = createSender(
+ session.getProtonSession(), address, options, senderId, tagGenerator);
return new ClientStreamSender(session, options, senderId, protonSender);
}
- private static Sender createSender(Session protonSession, String address, LinkOptions<?> options, String senderId) {
+ private static Sender createSender(Session protonSession, String address, LinkOptions<?> options,
+ String senderId, DeliveryTagGenerator tagGenerator) {
final String linkName;
if (options.linkName() != null) {
@@ -107,11 +119,15 @@ final class ClientSenderBuilder {
protonSender.setTarget(createTarget(address, options));
protonSender.setSource(createSource(senderId, options));
- // Use a tag generator that will reuse old tags. Later we might make this configurable.
- if (protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED) {
- protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.EMPTY.createGenerator());
+ if (tagGenerator == null) {
+ // Use a tag generator that will reuse old tags if not sending settled.
+ if (protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED) {
+ protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.EMPTY.createGenerator());
+ } else {
+ protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
+ }
} else {
- protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
+ protonSender.setDeliveryTagGenerator(tagGenerator);
}
return protonSender;
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index b946fcca..1f7f86bd 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -67,6 +67,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedEx
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
@@ -74,6 +75,7 @@ import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy
import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.apache.qpid.protonj2.types.transport.LinkError;
import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
@@ -2561,4 +2563,111 @@ public class SenderTest extends ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ private static DeliveryTagGenerator customTagGenerator() {
+ return new DeliveryTagGenerator() {
+
+ private int count = 1;
+
+ @Override
+ public DeliveryTag nextTag() {
+ switch (count++) {
+ case 1:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
+ case 2:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
+ case 3:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
+ default:
+ throw new UnsupportedOperationException("Only supports creating three tags");
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testSendeUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond();
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+ Session session = connection.openSession().openFuture().get();
+ SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+ .autoSettle(false)
+ .deliveryTagGeneratorSupplier(SenderTest::customTagGenerator);
+ Sender sender = session.openSender("test-tags", options).openFuture().get();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
+ peer.expectDetach().respond();
+ peer.expectClose().respond();
+
+ final Message<String> message = Message.create("Hello World");
+ final Tracker tracker1 = sender.send(message);
+ final Tracker tracker2 = sender.send(message);
+ final Tracker tracker3 = sender.send(message);
+
+ assertNotNull(tracker1);
+ assertNotNull(tracker1.settlementFuture().get().settled());
+ assertNotNull(tracker2);
+ assertNotNull(tracker2.settlementFuture().get().settled());
+ assertNotNull(tracker3);
+ assertNotNull(tracker3.settlementFuture().get().settled());
+
+ sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+ Session session = connection.openSession().openFuture().get();
+ SenderOptions options = new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+ .autoSettle(false)
+ .deliveryTagGeneratorSupplier(() -> null);
+ try {
+ session.openSender("test-tags", options).openFuture().get();
+ fail("Should not create a sender if the tag generator is not supplied");
+ } catch (ClientException cliEx) {
+ // Expected
+ }
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 5d0b7a84..65a8189a 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -62,6 +62,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.client.test.Wait;
+import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
@@ -74,6 +75,7 @@ import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatch
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedCompositingDataSectionMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedPartialDataSectionMatcher;
+import org.apache.qpid.protonj2.types.DeliveryTag;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.messaging.Data;
import org.apache.qpid.protonj2.types.messaging.Footer;
@@ -2788,4 +2790,110 @@ public class StreamSenderTest extends ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ private static DeliveryTagGenerator customTagGenerator() {
+ return new DeliveryTagGenerator() {
+
+ private int count = 1;
+
+ @Override
+ public DeliveryTag nextTag() {
+ switch (count++) {
+ case 1:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 1, 1, 1 });
+ case 2:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 2, 2, 2 });
+ case 3:
+ return new DeliveryTag.ProtonDeliveryTag(new byte[] { 3, 3, 3 });
+ default:
+ throw new UnsupportedOperationException("Only supports creating three tags");
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testSendeUsesCustomDeliveryTagGeneratorConfiguration() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond(); // Hidden session for stream sender
+ peer.expectAttach().ofSender().respond();
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+ StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+ .autoSettle(false)
+ .deliveryTagGeneratorSupplier(StreamSenderTest::customTagGenerator);
+ StreamSender sender = connection.openStreamSender("test-tags", options).openFuture().get();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {1, 1, 1}).respond().withSettled(true).withState().accepted();
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {2, 2, 2}).respond().withSettled(true).withState().accepted();
+ peer.expectTransfer().withNonNullPayload()
+ .withDeliveryTag(new byte[] {3, 3, 3}).respond().withSettled(true).withState().accepted();
+ peer.expectDetach().respond();
+ peer.expectEnd().respond(); // From hidden stream sender session
+ peer.expectClose().respond();
+
+ final Message<String> message = Message.create("Hello World");
+ final StreamTracker tracker1 = sender.send(message);
+ final StreamTracker tracker2 = sender.send(message);
+ final StreamTracker tracker3 = sender.send(message);
+
+ assertNotNull(tracker1);
+ assertNotNull(tracker1.settlementFuture().get().settled());
+ assertNotNull(tracker2);
+ assertNotNull(tracker2.settlementFuture().get().settled());
+ assertNotNull(tracker3);
+ assertNotNull(tracker3.settlementFuture().get().settled());
+
+ sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testCannotCreateSenderWhenTagGeneratorReturnsNull() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond(); // Hidden session for stream sender
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
+
+ StreamSenderOptions options = new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE)
+ .autoSettle(false)
+ .deliveryTagGeneratorSupplier(() -> null);
+ try {
+ connection.openStreamSender("test-tags", options).openFuture().get();
+ fail("Should not create a sender if the tag generator is not supplied");
+ } catch (ClientException cliEx) {
+ // Expected
+ }
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
index e38833b4..35876f32 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSequentialTagGenerator.java
@@ -84,11 +84,6 @@ public class ProtonSequentialTagGenerator extends ProtonDeliveryTagGenerator {
return ProtonByteBufferAllocator.DEFAULT.wrap(tagBytes());
}
- @Override
- public void release() {
- // Nothing to do in this implementation
- }
-
@Override
public DeliveryTag copy() {
return new ProtonNumericDeliveryTag(tagValue);
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
index f9c8df2b..3bdad7a5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonUuidTagGenerator.java
@@ -65,11 +65,6 @@ public class ProtonUuidTagGenerator extends ProtonDeliveryTagGenerator {
return ProtonByteBufferAllocator.DEFAULT.wrap(tagBytes());
}
- @Override
- public void release() {
- // Nothing to do for this tag implementation.
- }
-
@Override
public DeliveryTag copy() {
return new ProtonUuidDeliveryTag(tagValue);
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
index 08222278..227297b1 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/DeliveryTag.java
@@ -58,10 +58,11 @@ public interface DeliveryTag {
ProtonBuffer tagBuffer();
/**
- * Optional method used by tag implementations that provide pooling of tags. Implementations can
- * do nothing here if no release mechanics are needed.
+ * Optional method used by tag implementations that provide pooling of tags.
*/
- void release();
+ default void release() {
+ // Default is no action, a subclass can add an action if needed.
+ }
/**
* Create a copy of this delivery tag, the copy should account for any underlying pooling of tags that
@@ -138,11 +139,6 @@ public interface DeliveryTag {
return new ProtonDeliveryTag(Arrays.copyOf(tagBytes, tagBytes.length));
}
- @Override
- public void release() {
- // Nothing to do for this basic implementation.
- }
-
@Override
public int hashCode() {
if (hashCode == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org