You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/09/24 16:13:55 UTC

[qpid-jms] branch master updated (3ee86f9 -> 9cad42d)

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git.


    from 3ee86f9  QPIDJMS-475: update to Netty 4.1.41
     new 519bbd0  QPIDJMS-473: remove unused + incomplete anonymous fallback cache fragments
     new 458e6de  QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer
     new 9cad42d  QPIDJMS-473: improve handling/logging of exceptions during async completion

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  25 ++--
 .../amqp/AmqpAnonymousFallbackProducer.java        | 137 +++++--------------
 .../qpid/jms/provider/amqp/AmqpConnection.java     |  32 -----
 .../jms/integration/ProducerIntegrationTest.java   | 152 +++++++++++++++++++++
 4 files changed, 202 insertions(+), 144 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-jms] 01/03: QPIDJMS-473: remove unused + incomplete anonymous fallback cache fragments

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 519bbd010090ad1a2e4a4dec220540a32bafd7d3
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Tue Sep 24 16:59:27 2019 +0100

    QPIDJMS-473: remove unused + incomplete anonymous fallback cache fragments
---
 .../amqp/AmqpAnonymousFallbackProducer.java        | 109 +++------------------
 .../qpid/jms/provider/amqp/AmqpConnection.java     |  32 ------
 2 files changed, 15 insertions(+), 126 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 292b360..807ce4b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -16,9 +16,7 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.util.Map;
 
-import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
@@ -27,7 +25,6 @@ import org.apache.qpid.jms.provider.ProviderException;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
 import org.apache.qpid.jms.util.IdGenerator;
-import org.apache.qpid.jms.util.LRUCache;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +40,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class);
     private static final IdGenerator producerIdGenerator = new IdGenerator();
 
-    private final AnonymousProducerCache producerCache;
     private final String producerIdKey = producerIdGenerator.generateId();
     private long producerIdCount;
 
@@ -57,13 +53,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
      */
     public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo info) {
         super(session, info);
-
-        if (connection.isAnonymousProducerCache()) {
-            producerCache = new AnonymousProducerCache(10);
-            producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize());
-        } else {
-            producerCache = null;
-        }
     }
 
     @Override
@@ -75,46 +64,23 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         // the send.
         envelope.setSendAsync(false);
 
-        AmqpProducer producer = null;
-        if (connection.isAnonymousProducerCache()) {
-            producer = producerCache.get(envelope.getDestination());
-        }
+        // Create a new ProducerInfo for the short lived producer that's created to perform the
+        // send to the given AMQP target.
+        JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
+        info.setDestination(envelope.getDestination());
+        info.setPresettle(this.getResourceInfo().isPresettle());
 
-        if (producer == null) {
-            // Create a new ProducerInfo for the short lived producer that's created to perform the
-            // send to the given AMQP target.
-            JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
-            info.setDestination(envelope.getDestination());
-            info.setPresettle(this.getResourceInfo().isPresettle());
-
-            // We open a Fixed Producer instance with the target destination.  Once it opens
-            // it will trigger the open event which will in turn trigger the send event.
-            // If caching is disabled the created producer will be closed immediately after
-            // the entire send chain has finished and the delivery has been acknowledged.
-            AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
-            builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
-
-            if (connection.isAnonymousProducerCache()) {
-                // Cache it in hopes of not needing to create large numbers of producers.
-                producerCache.put(envelope.getDestination(), builder.getResource());
-            }
+        // We open a Fixed Producer instance with the target destination.  Once it opens
+        // it will trigger the open event which will in turn trigger the send event.
+        // The created producer will be closed immediately after the delivery has been acknowledged.
+        AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
+        builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
 
-            getParent().getProvider().pumpToProtonTransport(request);
-        } else {
-            producer.send(envelope, request);
-        }
+        getParent().getProvider().pumpToProtonTransport(request);
     }
 
     @Override
     public void close(AsyncResult request) {
-        // Trigger an immediate close, the internal producers that are currently in the cache
-        // if the cache is enabled.
-        if (connection.isAnonymousProducerCache()) {
-            for (AmqpProducer producer : producerCache.values()) {
-                producer.close(new CloseRequest(producer));
-            }
-        }
-
         request.onSuccess();
     }
 
@@ -201,22 +167,16 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         @Override
         public void onFailure(ProviderException result) {
             LOG.trace("Send phase of anonymous send failed: {} ", getProducerId());
-            if (!connection.isAnonymousProducerCache()) {
-                AnonymousCloseRequest close = new AnonymousCloseRequest(this);
-                producer.close(close);
-            }
+            AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+            producer.close(close);
             super.onFailure(result);
         }
 
         @Override
         public void onSuccess() {
             LOG.trace("Send phase of anonymous send complete: {} ", getProducerId());
-            if (!connection.isAnonymousProducerCache()) {
-                AnonymousCloseRequest close = new AnonymousCloseRequest(this);
-                producer.close(close);
-            } else {
-                super.onSuccess();
-            }
+            AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+            producer.close(close);
         }
 
         @Override
@@ -246,43 +206,4 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             return producer;
         }
     }
-
-    private final class CloseRequest implements AsyncResult {
-
-        private final AmqpProducer producer;
-
-        public CloseRequest(AmqpProducer producer) {
-            this.producer = producer;
-        }
-
-        @Override
-        public void onFailure(ProviderException result) {
-            AmqpAnonymousFallbackProducer.this.connection.getProvider().fireProviderException(result);
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Close of anonymous producer {} complete", producer);
-        }
-
-        @Override
-        public boolean isComplete() {
-            return producer.isClosed();
-        }
-    }
-
-    private final class AnonymousProducerCache extends LRUCache<JmsDestination, AmqpProducer> {
-
-        private static final long serialVersionUID = 1L;
-
-        public AnonymousProducerCache(int cacheSize) {
-            super(cacheSize);
-        }
-
-        @Override
-        protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> cached) {
-            LOG.trace("Producer: {} evicted from producer cache", cached.getValue());
-            cached.getValue().close(new CloseRequest(cached.getValue()));
-        }
-    }
 }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 30c6e02..5d55e6c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -56,8 +56,6 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     private AmqpConnectionSession connectionSession;
 
     private boolean objectMessageUsesAmqpTypes = false;
-    private boolean anonymousProducerCache = false;
-    private int anonymousProducerCacheSize = 10;
 
     public AmqpConnection(AmqpProvider provider, JmsConnectionInfo info, Connection protonConnection) {
         super(info, protonConnection, provider);
@@ -202,36 +200,6 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     /**
-     * @return true if anonymous producers should be cached or closed on send complete.
-     */
-    public boolean isAnonymousProducerCache() {
-        return anonymousProducerCache;
-    }
-
-    /**
-     * @param anonymousProducerCache
-     *        enable or disables the caching or anonymous producers.
-     */
-    public void setAnonymousProducerCache(boolean anonymousProducerCache) {
-        this.anonymousProducerCache = anonymousProducerCache;
-    }
-
-    /**
-     * @return the number of anonymous producers stored in each cache.
-     */
-    public int getAnonymousProducerCacheSize() {
-        return anonymousProducerCacheSize;
-    }
-
-    /**
-     * @param anonymousProducerCacheSize
-     *        the number of producers each anonymous producer instance will cache.
-     */
-    public void setAnonymousProducerCacheSize(int anonymousProducerCacheSize) {
-        this.anonymousProducerCacheSize = anonymousProducerCacheSize;
-    }
-
-    /**
      * @return true if new ObjectMessage instance should default to using AMQP Typed bodies.
      */
     public boolean isObjectMessageUsesAmqpTypes() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-jms] 02/03: QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 458e6de93f89cad1e9a75df5607c8a77a234d787
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Tue Sep 24 16:59:39 2019 +0100

    QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer
---
 .../amqp/AmqpAnonymousFallbackProducer.java        |  30 ++--
 .../jms/integration/ProducerIntegrationTest.java   | 152 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 10 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 807ce4b..63e5cbb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -59,10 +59,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException {
         LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
 
-        // Force sends marked as asynchronous to be sent synchronous so that the temporary
-        // producer instance can handle failures and perform necessary completion work on
-        // the send.
-        envelope.setSendAsync(false);
 
         // Create a new ProducerInfo for the short lived producer that's created to perform the
         // send to the given AMQP target.
@@ -74,7 +70,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         // it will trigger the open event which will in turn trigger the send event.
         // The created producer will be closed immediately after the delivery has been acknowledged.
         AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
-        builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
+        builder.buildResource(new AnonymousSendRequest(request, builder, envelope, envelope.isCompletionRequired()));
+
+        // Force sends to be sent synchronous so that the temporary producer instance can handle
+        // the failures and perform necessary completion work on the send.
+        envelope.setSendAsync(false);
+        envelope.setCompletionRequired(false);
 
         getParent().getProvider().pumpToProtonTransport(request);
     }
@@ -108,10 +109,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
     private abstract class AnonymousRequest extends WrappedAsyncResult {
 
         protected final JmsOutboundMessageDispatch envelope;
+        private final boolean completionRequired;
 
-        public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope) {
+        public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
             super(sendResult);
             this.envelope = envelope;
+            this.completionRequired = completionRequired;
         }
 
         /**
@@ -124,6 +127,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
             super.onFailure(result);
         }
 
+        public boolean isCompletionRequired() {
+            return completionRequired;
+        }
+
         public abstract AmqpProducer getProducer();
     }
 
@@ -131,8 +138,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
 
         private final AmqpProducerBuilder producerBuilder;
 
-        public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
-            super(sendResult, envelope);
+        public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
+            super(sendResult, envelope, completionRequired);
 
             this.producerBuilder = producerBuilder;
         }
@@ -159,7 +166,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
-            super(open.getWrappedRequest(), open.envelope);
+            super(open.getWrappedRequest(), open.envelope, open.isCompletionRequired());
 
             this.producer = open.getProducer();
         }
@@ -190,7 +197,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) {
-            super(sendComplete.getWrappedRequest(), sendComplete.envelope);
+            super(sendComplete.getWrappedRequest(), sendComplete.envelope, sendComplete.isCompletionRequired());
 
             this.producer = sendComplete.getProducer();
         }
@@ -199,6 +206,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
         public void onSuccess() {
             LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
             super.onSuccess();
+            if (isCompletionRequired()) {
+                getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+            }
         }
 
         @Override
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 3c36512..0e5ce7d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2841,6 +2841,158 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+            // for an actual send to occur on the producer before anything occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause creation of a new
+            // sender link to the given destination, then closing the link after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+            TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            // The fallback producer acts as synchronous regardless of the completion listener,
+            // so exceptions are thrown from send. Only onComplete uses the listener.
+            try {
+                producer.send(dest, message, completionListener);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            //Repeat the send (but accept this time) and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+            // for an actual send to occur on the producer before anything occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause creation of a new
+            // sender link to the given destination, then closing the link after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            producer.send(dest, message, completionListener);
+
+            assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener.exception);
+            Message receivedMessage = completionListener.message;
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage).getText());
+
+            //Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws Exception {
         doSendingMessageSetsJMSDeliveryTimeTestImpl(true);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-jms] 03/03: QPIDJMS-473: improve handling/logging of exceptions during async completion

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 9cad42da6fae11811ec1aea140705b3c59ac88cd
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Tue Sep 24 17:04:27 2019 +0100

    QPIDJMS-473: improve handling/logging of exceptions during async completion
---
 .../main/java/org/apache/qpid/jms/JmsSession.java  | 25 ++++++++++++++--------
 1 file changed, 16 insertions(+), 9 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index d3b30c5..7015a6f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -1485,6 +1485,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                     try {
                         completion.signalCompletion();
                     } catch (Throwable error) {
+                        LOG.error("Failure while performing completion for send: {}", completion.envelope, error);
                     } finally {
                         LOG.trace("Signaled completion of send: {}", completion.envelope);
                     }
@@ -1526,8 +1527,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                         }
                         completion.signalCompletion();
                     } catch (Throwable error) {
-                        LOG.trace("Failed while performing send completion: {}", envelope);
-                        // TODO - What now?
+                        LOG.error("Failure while performing completion for send: {}", envelope, error);
                     }
 
                     // Signal any trailing completions that have been marked complete
@@ -1539,8 +1539,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                             try {
                                 completion.signalCompletion();
                             } catch (Throwable error) {
-                                LOG.trace("Failed while performing send completion: {}", envelope);
-                                // TODO - What now?
+                                LOG.error("Failure while performing completion for send: {}", envelope, error);
                             } finally {
                                 pending.remove();
                             }
@@ -1564,8 +1563,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                     }
                 }
             } catch (Exception ex) {
-                LOG.debug("Send completion task encounted unexpected error: {}", ex.getMessage());
-                // TODO - What now
+                LOG.error("Async completion task encountered unexpected failure", ex);
             }
         }
     }
@@ -1597,12 +1595,21 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
 
         public void signalCompletion() {
-            envelope.getMessage().onSendComplete();  // Ensure message is returned as readable.
+            JmsMessage message = envelope.getMessage();
+            message.onSendComplete();  // Ensure message is returned as readable.
 
             if (failureCause == null) {
-                listener.onCompletion(envelope.getMessage());
+                try {
+                    listener.onCompletion(message);
+                } catch (Exception ex) {
+                    LOG.trace("CompletionListener threw exception from onCompletion for send {}", envelope, ex);
+                }
             } else {
-                listener.onException(envelope.getMessage(), failureCause);
+                try {
+                    listener.onException(message, failureCause);
+                } catch (Exception ex) {
+                    LOG.trace("CompletionListener threw exception from onException for send {}", envelope, ex);
+                }
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org