You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/09/24 16:13:57 UTC

[qpid-jms] 02/03: QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 458e6de93f89cad1e9a75df5607c8a77a234d787
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Tue Sep 24 16:59:39 2019 +0100

    QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer
---
 .../amqp/AmqpAnonymousFallbackProducer.java        |  30 ++--
 .../jms/integration/ProducerIntegrationTest.java   | 152 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 10 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 807ce4b..63e5cbb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -59,10 +59,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException {
         LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
 
-        // Force sends marked as asynchronous to be sent synchronous so that the temporary
-        // producer instance can handle failures and perform necessary completion work on
-        // the send.
-        envelope.setSendAsync(false);
 
         // Create a new ProducerInfo for the short lived producer that's created to perform the
         // send to the given AMQP target.
@@ -74,7 +70,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         // it will trigger the open event which will in turn trigger the send event.
         // The created producer will be closed immediately after the delivery has been acknowledged.
         AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
-        builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
+        builder.buildResource(new AnonymousSendRequest(request, builder, envelope, envelope.isCompletionRequired()));
+
+        // Force sends to be sent synchronous so that the temporary producer instance can handle
+        // the failures and perform necessary completion work on the send.
+        envelope.setSendAsync(false);
+        envelope.setCompletionRequired(false);
 
         getParent().getProvider().pumpToProtonTransport(request);
     }
@@ -108,10 +109,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     private abstract class AnonymousRequest extends WrappedAsyncResult {
 
         protected final JmsOutboundMessageDispatch envelope;
+        private final boolean completionRequired;
 
-        public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope) {
+        public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
             super(sendResult);
             this.envelope = envelope;
+            this.completionRequired = completionRequired;
         }
 
         /**
@@ -124,6 +127,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             super.onFailure(result);
         }
 
+        public boolean isCompletionRequired() {
+            return completionRequired;
+        }
+
         public abstract AmqpProducer getProducer();
     }
 
@@ -131,8 +138,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
 
         private final AmqpProducerBuilder producerBuilder;
 
-        public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
-            super(sendResult, envelope);
+        public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
+            super(sendResult, envelope, completionRequired);
 
             this.producerBuilder = producerBuilder;
         }
@@ -159,7 +166,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
-            super(open.getWrappedRequest(), open.envelope);
+            super(open.getWrappedRequest(), open.envelope, open.isCompletionRequired());
 
             this.producer = open.getProducer();
         }
@@ -190,7 +197,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) {
-            super(sendComplete.getWrappedRequest(), sendComplete.envelope);
+            super(sendComplete.getWrappedRequest(), sendComplete.envelope, sendComplete.isCompletionRequired());
 
             this.producer = sendComplete.getProducer();
         }
@@ -199,6 +206,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         public void onSuccess() {
             LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
             super.onSuccess();
+            if (isCompletionRequired()) {
+                getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+            }
         }
 
         @Override
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 3c36512..0e5ce7d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2841,6 +2841,158 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+            // for an actual send to occur on the producer before anything occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause creation of a new
+            // sender link to the given destination, then closing the link after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+            TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            // The fallback producer acts as synchronous regardless of the completion listener,
+            // so exceptions are thrown from send. Only onComplete uses the listener.
+            try {
+                producer.send(dest, message, completionListener);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            //Repeat the send (but accept this time) and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+            // for an actual send to occur on the producer before anything occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause creation of a new
+            // sender link to the given destination, then closing the link after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            producer.send(dest, message, completionListener);
+
+            assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener.exception);
+            Message receivedMessage = completionListener.message;
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage).getText());
+
+            //Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws Exception {
         doSendingMessageSetsJMSDeliveryTimeTestImpl(true);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org