You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/13 20:05:08 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4968

Updated Branches:
  refs/heads/trunk 6377d49a1 -> 2a7c34997


https://issues.apache.org/jira/browse/AMQ-4968

Add non-caching mode for Session producers.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a7c3499
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a7c3499
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a7c3499

Branch: refs/heads/trunk
Commit: 2a7c3499766c961c6996429af82c3ee374d24a33
Parents: 6377d49
Author: Timothy Bish <ta...@gmai.com>
Authored: Mon Jan 13 14:05:04 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Mon Jan 13 14:05:04 2014 -0500

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       |  13 +-
 .../jms/pool/PooledConnectionFactory.java       |  28 ++++
 .../activemq/jms/pool/PooledProducer.java       |  24 +++-
 .../apache/activemq/jms/pool/PooledSession.java | 136 +++++++++++++------
 .../PooledSessionNoPublisherCachingTest.java    | 129 ++++++++++++++++++
 .../activemq/jms/pool/PooledSessionTest.java    |  71 ++++++++++
 6 files changed, 358 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index df2da17..08d2038 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -22,9 +22,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Session;
-import javax.jms.IllegalStateException;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -51,6 +51,7 @@ public class ConnectionPool {
     private boolean hasExpired;
     private int idleTimeout = 30 * 1000;
     private long expiryTimeout = 0l;
+    private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
@@ -78,7 +79,7 @@ public class ConnectionPool {
                 @Override
                 public PooledSession makeObject(SessionKey key) throws Exception {
                     Session session = makeSession(key);
-                    return new PooledSession(key, session, sessionPool, key.isTransacted());
+                    return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers);
                 }
 
                 @Override
@@ -248,6 +249,14 @@ public class ConnectionPool {
         this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
     }
 
+    public boolean isUseAnonymousProducers() {
+        return this.useAnonymousProducers;
+    }
+
+    public void setUseAnonymousProducers(boolean value) {
+        this.useAnonymousProducers = value;
+    }
+
     /**
      * @return the total number of Pooled session including idle sessions that are not
      *          currently loaned out to any client.

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 64eaad2..9ac853d 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -70,6 +70,7 @@ public class PooledConnectionFactory implements ConnectionFactory {
     private boolean blockIfSessionPoolIsFull = true;
     private long expiryTimeout = 0l;
     private boolean createConnectionOnStartup = true;
+    private boolean useAnonymousProducers = true;
 
     public void initConnectionsPool() {
         if (this.connectionsPool == null) {
@@ -101,6 +102,7 @@ public class PooledConnectionFactory implements ConnectionFactory {
                         connection.setExpiryTimeout(getExpiryTimeout());
                         connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
                         connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
+                        connection.setUseAnonymousProducers(isUseAnonymousProducers());
 
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Created new connection: {}", connection);
@@ -427,6 +429,32 @@ public class PooledConnectionFactory implements ConnectionFactory {
     }
 
     /**
+     * Should Sessions use one anonymous producer for all producer requests or should a new
+     * MessageProducer be created for each request to create a producer object, default is true.
+     *
+     * When enabled the session only needs to allocate one MessageProducer for all requests and
+     * the MessageProducer#send(destination, message) method can be used.  Normally this is the
+     * right thing to do however it does result in the Broker not showing the producers per
+     * destination.
+     *
+     * @return true if a PooledSession will use only a single anonymous message producer instance.
+     */
+    public boolean isUseAnonymousProducers() {
+        return this.useAnonymousProducers;
+    }
+
+    /**
+     * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates
+     * a new MessageProducer for each call the create a MessageProducer.
+     *
+     * @param value
+     *      Boolean value that configures whether anonymous producers are used.
+     */
+    public void setUseAnonymousProducers(boolean value) {
+        this.useAnonymousProducers = value;
+    }
+
+    /**
      * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
      *
      * @return this factories pool of ConnectionPool instances.

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
index 817a1f1..7f54b99 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.jms.pool;
 
 import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -34,10 +35,12 @@ public class PooledProducer implements MessageProducer {
     private boolean disableMessageTimestamp;
     private int priority;
     private long timeToLive;
+    private boolean anonymous = true;
 
     public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
         this.messageProducer = messageProducer;
         this.destination = destination;
+        this.anonymous = messageProducer.getDestination() == null;
 
         this.deliveryMode = messageProducer.getDeliveryMode();
         this.disableMessageID = messageProducer.getDisableMessageID();
@@ -48,6 +51,9 @@ public class PooledProducer implements MessageProducer {
 
     @Override
     public void close() throws JMSException {
+        if (!anonymous) {
+            this.messageProducer.close();
+        }
     }
 
     @Override
@@ -67,13 +73,25 @@ public class PooledProducer implements MessageProducer {
 
     @Override
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+
         if (destination == null) {
-            destination = this.destination;
+            if (messageProducer.getDestination() == null) {
+                throw new UnsupportedOperationException("A destination must be specified.");
+            }
+            throw new InvalidDestinationException("Don't understand null destinations");
         }
+
         MessageProducer messageProducer = getMessageProducer();
 
         // just in case let only one thread send at once
         synchronized (messageProducer) {
+
+            if (anonymous && !this.destination.equals(destination)) {
+                throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination);
+            }
+
+            // Producer will do it's own Destination validation so always use the destination
+            // based send method otherwise we might violate a JMS rule.
             messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
         }
     }
@@ -139,6 +157,10 @@ public class PooledProducer implements MessageProducer {
         return messageProducer;
     }
 
+    protected boolean isAnonymous() {
+        return anonymous;
+    }
+
     @Override
     public String toString() {
         return "PooledProducer { " + messageProducer + " }";

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index d0e4a09..1d3fc2f 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -57,22 +57,24 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
-        new CopyOnWriteArrayList<PooledSessionEventListener>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
 
     private Session session;
-    private MessageProducer messageProducer;
-    private QueueSender queueSender;
-    private TopicPublisher topicPublisher;
     private boolean transactional = true;
     private boolean ignoreClose;
     private boolean isXa;
+    private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional) {
+    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
         this.session = session;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
+        this.useAnonymousProducers = anonymous;
     }
 
     public void addSessionEventListener(PooledSessionEventListener listener) {
@@ -268,7 +270,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     @Override
     public XAResource getXAResource() {
         if (session instanceof XASession) {
-            return ((XASession)session).getXAResource();
+            return ((XASession) session).getXAResource();
         }
         return null;
     }
@@ -334,53 +336,39 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic));
+        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
     }
 
     @Override
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
-        return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local));
+        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
     }
 
     @Override
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue));
+        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
     }
 
     @Override
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
-        return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector));
+        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
     }
 
     // Producer related methods
     // -------------------------------------------------------------------------
     @Override
     public MessageProducer createProducer(Destination destination) throws JMSException {
-        return new PooledProducer(getMessageProducer(), destination);
+        return new PooledProducer(getMessageProducer(destination), destination);
     }
 
     @Override
     public QueueSender createSender(Queue queue) throws JMSException {
-        return new PooledQueueSender(getQueueSender(), queue);
+        return new PooledQueueSender(getQueueSender(queue), queue);
     }
 
     @Override
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return new PooledTopicPublisher(getTopicPublisher(), topic);
-    }
-
-    /**
-     * Callback invoked when the consumer is closed.
-     * <p/>
-     * This is used to keep track of an explicit closed consumer created by this
-     * session, by which we know do not need to keep track of the consumer, as
-     * its already closed.
-     *
-     * @param consumer
-     *            the consumer which is being closed
-     */
-    protected void onConsumerClose(MessageConsumer consumer) {
-        consumers.remove(consumer);
+        return new PooledTopicPublisher(getTopicPublisher(topic), topic);
     }
 
     public Session getInternalSession() throws IllegalStateException {
@@ -391,24 +379,78 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
-        if (messageProducer == null) {
-            messageProducer = getInternalSession().createProducer(null);
+        return getMessageProducer(null);
+    }
+
+    public MessageProducer getMessageProducer(Destination destination) throws JMSException {
+        MessageProducer result = null;
+
+        if (useAnonymousProducers) {
+            if (producer == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (producer == null) {
+                        producer = getInternalSession().createProducer(null);
+                    }
+                }
+            }
+
+            result = producer;
+        } else {
+            result = getInternalSession().createProducer(destination);
         }
-        return messageProducer;
+
+        return result;
     }
 
     public QueueSender getQueueSender() throws JMSException {
-        if (queueSender == null) {
-            queueSender = ((QueueSession)getInternalSession()).createSender(null);
+        return getQueueSender(null);
+    }
+
+    public QueueSender getQueueSender(Queue destination) throws JMSException {
+        QueueSender result = null;
+
+        if (useAnonymousProducers) {
+            if (sender == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (sender == null) {
+                        sender = ((QueueSession) getInternalSession()).createSender(null);
+                    }
+                }
+            }
+
+            result = sender;
+        } else {
+            result = ((QueueSession) getInternalSession()).createSender(destination);
         }
-        return queueSender;
+
+        return result;
     }
 
     public TopicPublisher getTopicPublisher() throws JMSException {
-        if (topicPublisher == null) {
-            topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null);
+        return getTopicPublisher(null);
+    }
+
+    public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
+        TopicPublisher result = null;
+
+        if (useAnonymousProducers) {
+            if (publisher == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (publisher == null) {
+                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
+                    }
+                }
+            }
+
+            result = publisher;
+        } else {
+            result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
-        return topicPublisher;
+
+        return result;
     }
 
     private QueueBrowser addQueueBrowser(QueueBrowser browser) {
@@ -418,9 +460,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
-        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
-        // invoked when the returned consumer is closed, to avoid memory leak in this
-        // session class in case many consumers is created
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
+        // method is invoked when the returned consumer is closed, to avoid memory
+        // leak in this session class in case many consumers is created
         return new PooledMessageConsumer(this, consumer);
     }
 
@@ -442,4 +484,18 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     public String toString() {
         return "PooledSession { " + session + " }";
     }
+
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
+     *
+     * @param consumer
+     *            the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java
new file mode 100644
index 0000000..6671376
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionNoPublisherCachingTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.fail;
+
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PooledSessionNoPublisherCachingTest {
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        connectionUri = connector.getPublishableConnectString();
+        factory = new ActiveMQConnectionFactory(connectionUri);
+        pooledFactory = new PooledConnectionFactory();
+        pooledFactory.setConnectionFactory(factory);
+        pooledFactory.setMaxConnections(1);
+        pooledFactory.setBlockIfSessionPoolIsFull(false);
+        pooledFactory.setUseAnonymousProducers(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = null;
+    }
+
+    @Test
+    public void testMessageProducersAreUnique() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledProducer producer1 = (PooledProducer) session.createProducer(queue1);
+        PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
+
+        assertNotSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+    }
+
+    @Test
+    public void testThrowsWhenDestinationGiven() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledProducer producer = (PooledProducer) session.createProducer(queue1);
+
+        try {
+            producer.send(queue2, session.createTextMessage());
+            fail("Should only be able to send to queue 1");
+        } catch (Exception ex) {
+        }
+
+        try {
+            producer.send(null, session.createTextMessage());
+            fail("Should only be able to send to queue 1");
+        } catch (Exception ex) {
+        }
+    }
+
+    @Test
+    public void testCreateTopicPublisher() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic1 = session.createTopic("Topic-1");
+        Topic topic2 = session.createTopic("Topic-2");
+
+        PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1);
+        PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
+
+        assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+    }
+
+    @Test
+    public void testQueueSender() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1);
+        PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
+
+        assertNotSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7c3499/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
index a60d053..7483e6b 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -17,8 +17,14 @@
 package org.apache.activemq.jms.pool;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
 
+import javax.jms.Queue;
+import javax.jms.QueueSession;
 import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -68,4 +74,69 @@ public class PooledSessionTest {
         assertEquals(1, connection.getNumtIdleSessions());
         assertEquals(1, connection.getNumSessions());
     }
+
+    @Test
+    public void testMessageProducersAreAllTheSame() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledProducer producer1 = (PooledProducer) session.createProducer(queue1);
+        PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
+
+        assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+    }
+
+    @Test
+    public void testThrowsWhenDifferentDestinationGiven() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledProducer producer = (PooledProducer) session.createProducer(queue1);
+
+        try {
+            producer.send(queue2, session.createTextMessage());
+            fail("Should only be able to send to queue 1");
+        } catch (Exception ex) {
+        }
+
+        try {
+            producer.send(null, session.createTextMessage());
+            fail("Should only be able to send to queue 1");
+        } catch (Exception ex) {
+        }
+    }
+
+    @Test
+    public void testCreateTopicPublisher() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Topic topic1 = session.createTopic("Topic-1");
+        Topic topic2 = session.createTopic("Topic-2");
+
+        PooledTopicPublisher publisher1 = (PooledTopicPublisher) session.createPublisher(topic1);
+        PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
+
+        assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+    }
+
+    @Test
+    public void testQueueSender() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue queue1 = session.createTemporaryQueue();
+        Queue queue2 = session.createTemporaryQueue();
+
+        PooledQueueSender sender1 = (PooledQueueSender) session.createSender(queue1);
+        PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
+
+        assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+    }
 }