You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/05 23:55:06 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5564

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.11.x 0c13daa9a -> 666f6f3b3


https://issues.apache.org/jira/browse/AMQ-5564

Fixed session in the pool losing their reference to the anonymous
producer created when useAnonymousProducers is true.  The anonymous
producer stays live for the life of the pooled session.

Also added some synchronization safety to some methods that could get
into NPE trouble.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/38994bd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/38994bd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/38994bd4

Branch: refs/heads/activemq-5.11.x
Commit: 38994bd478f329cfc1d8e636ec3c4a182f0ed65c
Parents: 0c13daa
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 5 17:50:43 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 5 17:54:01 2015 -0500

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       | 25 +++--
 .../activemq/jms/pool/PooledConnection.java     |  6 +-
 .../apache/activemq/jms/pool/PooledSession.java | 81 ++++++----------
 .../apache/activemq/jms/pool/SessionHolder.java | 98 ++++++++++++++++++++
 .../activemq/jms/pool/PooledSessionTest.java    | 53 +++++++++--
 5 files changed, 193 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/38994bd4/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index c802a17..6c3fdc3 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -21,8 +21,13 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -51,7 +56,7 @@ public class ConnectionPool implements ExceptionListener {
     private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
+    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
     private boolean reconnectOnException;
     private ExceptionListener parentExceptionListener;
@@ -61,29 +66,29 @@ public class ConnectionPool implements ExceptionListener {
         this.connection = wrap(connection);
 
         // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
-            new KeyedPoolableObjectFactory<SessionKey, Session>() {
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
+            new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
 
                 @Override
-                public void activateObject(SessionKey key, Session session) throws Exception {
+                public void activateObject(SessionKey key, SessionHolder session) throws Exception {
                 }
 
                 @Override
-                public void destroyObject(SessionKey key, Session session) throws Exception {
+                public void destroyObject(SessionKey key, SessionHolder session) throws Exception {
                     session.close();
                 }
 
                 @Override
-                public Session makeObject(SessionKey key) throws Exception {
-                    return makeSession(key);
+                public SessionHolder makeObject(SessionKey key) throws Exception {
+                    return new SessionHolder(makeSession(key));
                 }
 
                 @Override
-                public void passivateObject(SessionKey key, Session session) throws Exception {
+                public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
                 }
 
                 @Override
-                public boolean validateObject(SessionKey key, Session session) {
+                public boolean validateObject(SessionKey key, SessionHolder session) {
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/38994bd4/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index b268862..b7b56ba 100755
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
@@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
-import javax.jms.IllegalStateException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
 
     @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        PooledSession result;
-        result = (PooledSession) pool.createSession(transacted, ackMode);
+        PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
 
         // Store the session so we can close the sessions that this PooledConnection
         // created in order to ensure that consumers etc are closed per the JMS contract.

http://git-wip-us.apache.org/repos/asf/activemq/blob/38994bd4/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index 3a2e698..cbfec29 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
     private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, Session> sessionPool;
+    private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
     private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
     private final AtomicBoolean closed = new AtomicBoolean();
 
-    private MessageProducer producer;
-    private TopicPublisher publisher;
-    private QueueSender sender;
-
-    private Session session;
+    private SessionHolder sessionHolder;
     private boolean transactional = true;
     private boolean ignoreClose;
     private boolean isXa;
     private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
+    public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
-        this.session = session;
+        this.sessionHolder = sessionHolder;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
         this.useAnonymousProducers = anonymous;
@@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
             if (invalidate) {
                 // lets close the session and not put the session back into the pool
                 // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
+                if (sessionHolder != null) {
                     try {
-                        session.close();
+                        sessionHolder.close();
                     } catch (JMSException e1) {
                         LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
                     }
                 }
                 try {
-                    sessionPool.invalidateObject(key, session);
+                    sessionPool.invalidateObject(key, sessionHolder);
                 } catch (Exception e) {
                     LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
                 }
             } else {
                 try {
-                    sessionPool.returnObject(key, session);
+                    sessionPool.returnObject(key, sessionHolder);
                 } catch (Exception e) {
                     javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
                     illegalStateException.initCause(e);
@@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
                 }
             }
 
-            session = null;
+            sessionHolder = null;
         }
     }
 
@@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public XAResource getXAResource() {
-        if (session instanceof XASession) {
-            return ((XASession) session).getXAResource();
+        SessionHolder session = safeGetSessionHolder();
+
+        if (session.getSession() instanceof XASession) {
+            return ((XASession) session.getSession()).getXAResource();
         }
+
         return null;
     }
 
@@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public void run() {
+        SessionHolder session = safeGetSessionHolder();
         if (session != null) {
-            session.run();
+            session.getSession().run();
         }
     }
 
@@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     }
 
     public Session getInternalSession() throws IllegalStateException {
-        if (session == null) {
-            throw new IllegalStateException("The session has already been closed");
-        }
-        return session;
+        return safeGetSessionHolder().getSession();
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
@@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         MessageProducer result = null;
 
         if (useAnonymousProducers) {
-            if (producer == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (producer == null) {
-                        producer = getInternalSession().createProducer(null);
-                    }
-                }
-            }
-
-            result = producer;
+            result = safeGetSessionHolder().getOrCreateProducer();
         } else {
             result = getInternalSession().createProducer(destination);
         }
@@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         QueueSender result = null;
 
         if (useAnonymousProducers) {
-            if (sender == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (sender == null) {
-                        sender = ((QueueSession) getInternalSession()).createSender(null);
-                    }
-                }
-            }
-
-            result = sender;
+            result = safeGetSessionHolder().getOrCreateSender();
         } else {
             result = ((QueueSession) getInternalSession()).createSender(destination);
         }
@@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         TopicPublisher result = null;
 
         if (useAnonymousProducers) {
-            if (publisher == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (publisher == null) {
-                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
-                    }
-                }
-            }
-
-            result = publisher;
+            result = safeGetSessionHolder().getOrCreatePublisher();
         } else {
             result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
@@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public String toString() {
-        return "PooledSession { " + session + " }";
+        return "PooledSession { " + safeGetSessionHolder() + " }";
     }
 
     /**
@@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     protected void onConsumerClose(MessageConsumer consumer) {
         consumers.remove(consumer);
     }
+
+    private SessionHolder safeGetSessionHolder() {
+        SessionHolder sessionHolder = this.sessionHolder;
+        if (sessionHolder == null) {
+            throw new IllegalStateException("The session has already been closed");
+        }
+
+        return sessionHolder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/38994bd4/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
new file mode 100644
index 0000000..afa75d6
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+/**
+ * Used to store a pooled session instance and any resources that can
+ * be left open and carried along with the pooled instance such as the
+ * anonymous producer used for all MessageProducer instances created
+ * from this pooled session when enabled.
+ */
+public class SessionHolder {
+
+    private final Session session;
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
+    public SessionHolder(Session session) {
+        this.session = session;
+    }
+
+    public void close() throws JMSException {
+        try {
+            session.close();
+        } finally {
+            producer = null;
+            publisher = null;
+            sender = null;
+        }
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public MessageProducer getOrCreateProducer() throws JMSException {
+        if (producer == null) {
+            synchronized (this) {
+                if (producer == null) {
+                    producer = session.createProducer(null);
+                }
+            }
+        }
+
+        return producer;
+    }
+
+    public TopicPublisher getOrCreatePublisher() throws JMSException {
+        if (publisher == null) {
+            synchronized (this) {
+                if (publisher == null) {
+                    publisher = ((TopicSession) session).createPublisher(null);
+                }
+            }
+        }
+
+        return publisher;
+    }
+
+    public QueueSender getOrCreateSender() throws JMSException {
+        if (sender == null) {
+            synchronized (this) {
+                if (sender == null) {
+                    sender = ((QueueSession) session).createSender(null);
+                }
+            }
+        }
+
+        return sender;
+    }
+
+    @Override
+    public String toString() {
+        return session.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/38994bd4/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
index 7483e6b..9432add 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -17,9 +17,13 @@
 package org.apache.activemq.jms.pool;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -44,7 +48,8 @@ public class PooledSessionTest {
     public void setUp() throws Exception {
         broker = new BrokerService();
         broker.setPersistent(false);
-        broker.setUseJmx(false);
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateMBeanServer(false);
         TransportConnector connector = broker.addConnector("tcp://localhost:0");
         broker.start();
         connectionUri = connector.getPublishableConnectString();
@@ -62,7 +67,7 @@ public class PooledSessionTest {
         broker = null;
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testPooledSessionStats() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
 
@@ -73,9 +78,11 @@ public class PooledSessionTest {
         assertEquals(0, connection.getNumActiveSessions());
         assertEquals(1, connection.getNumtIdleSessions());
         assertEquals(1, connection.getNumSessions());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testMessageProducersAreAllTheSame() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,9 +94,11 @@ public class PooledSessionTest {
         PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
 
         assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testThrowsWhenDifferentDestinationGiven() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -110,9 +119,11 @@ public class PooledSessionTest {
             fail("Should only be able to send to queue 1");
         } catch (Exception ex) {
         }
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testCreateTopicPublisher() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -124,9 +135,10 @@ public class PooledSessionTest {
         PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
 
         assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testQueueSender() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -138,5 +150,34 @@ public class PooledSessionTest {
         PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
 
         assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRepeatedCreateSessionProducerResultsInSame() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+
+        assertTrue(pooledFactory.isUseAnonymousProducers());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic("test-topic");
+        PooledProducer producer = (PooledProducer) session.createProducer(destination);
+        MessageProducer original = producer.getMessageProducer();
+        assertNotNull(original);
+        session.close();
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        for (int i = 0; i < 20; ++i) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = (PooledProducer) session.createProducer(destination);
+            assertSame(original, producer.getMessageProducer());
+            session.close();
+        }
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        connection.close();
+        pooledFactory.clear();
     }
 }


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5550

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5550

Ensure that the consumer and producer context instances are marked as
closed when creation at the broker end fails.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/666f6f3b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/666f6f3b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/666f6f3b

Branch: refs/heads/activemq-5.11.x
Commit: 666f6f3b3fced9d3d9b6c24fc3769752330b5b40
Parents: 38994bd
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 30 10:35:35 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 5 17:55:02 2015 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpProtocolConverter.java | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/666f6f3b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 238b8b0..131df8f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -316,6 +316,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
                 Event event = null;
                 while ((event = eventCollector.peek()) != null) {
+                    if (amqpTransport.isTrace()) {
+                        LOG.trace("Processing event: {}", event.getType());
+                    }
                     switch (event.getType()) {
                         case CONNECTION_REMOTE_OPEN:
                         case CONNECTION_REMOTE_CLOSE:
@@ -761,6 +764,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             }
         }
 
+        public void close() {
+            closed = true;
+        }
+
         public boolean isAnonymous() {
             return anonymous;
         }
@@ -898,7 +905,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     dest = createDestination(remoteTarget);
                 }
 
-                ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
+                final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
 
                 receiver.setContext(producerContext);
                 receiver.flow(flow);
@@ -916,7 +923,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             } else {
                                 receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
                             }
+                            producerContext.closed = true;
                             receiver.close();
+                            receiver.free();
                         } else {
                             receiver.open();
                         }
@@ -1423,7 +1432,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
                         }
                         subscriptionsByConsumerId.remove(id);
+                        consumerContext.closed = true;
                         sender.close();
+                        sender.free();
                     } else {
                         sessionContext.consumers.put(id, consumerContext);
                         sender.open();