You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/02 20:59:33 UTC
svn commit: r1142267 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-pool/src/main/java/org/apache/activemq/pool/
activemq-pool/src/test/java/org/apache/activemq/pool/
Author: tabish
Date: Sat Jul 2 18:59:33 2011
New Revision: 1142267
URL: http://svn.apache.org/viewvc?rev=1142267&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-2349
fix for: https://issues.apache.org/jira/browse/AMQ-2716
When a PooledConnection is closed the Temp Destinations of the contained Connection should be removed.
Applied patch from AMQ-2349 with modifications to prevent NullPointerExceptions and some other small cleanups.
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Sat Jul 2 18:59:33 2011
@@ -196,7 +196,7 @@ public class ActiveMQConnection implemen
/**
* Construct an <code>ActiveMQConnection</code>
- *
+ *
* @param transport
* @param factoryStats
* @throws Exception
@@ -243,7 +243,7 @@ public class ActiveMQConnection implemen
/**
* A static helper method to create a new connection
- *
+ *
* @return an ActiveMQConnection
* @throws JMSException
*/
@@ -254,7 +254,7 @@ public class ActiveMQConnection implemen
/**
* A static helper method to create a new connection
- *
+ *
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
@@ -266,7 +266,7 @@ public class ActiveMQConnection implemen
/**
* A static helper method to create a new connection
- *
+ *
* @param user
* @param password
* @param uri
@@ -287,7 +287,7 @@ public class ActiveMQConnection implemen
/**
* Creates a <CODE>Session</CODE> object.
- *
+ *
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@@ -334,7 +334,7 @@ public class ActiveMQConnection implemen
* an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
* dynamically by the application by calling the <code>setClientID</code>
* method.
- *
+ *
* @return the unique client identifier
* @throws JMSException if the JMS provider fails to return the client ID
* for this connection due to some internal error.
@@ -372,7 +372,7 @@ public class ActiveMQConnection implemen
* If another connection with the same <code>clientID</code> is already
* running when this method is called, the JMS provider should detect the
* duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
- *
+ *
* @param newClientID the unique client identifier
* @throws JMSException if the JMS provider fails to set the client ID for
* this connection due to some internal error.
@@ -409,7 +409,7 @@ public class ActiveMQConnection implemen
/**
* Gets the metadata for this connection.
- *
+ *
* @return the connection metadata
* @throws JMSException if the JMS provider fails to get the connection
* metadata for this connection.
@@ -424,7 +424,7 @@ public class ActiveMQConnection implemen
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
* every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
* associated with it.
- *
+ *
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null, if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
@@ -455,7 +455,7 @@ public class ActiveMQConnection implemen
* <P>
* A JMS provider should attempt to resolve connection problems itself
* before it notifies the client of them.
- *
+ *
* @param listener the exception listener
* @throws JMSException if the JMS provider fails to set the exception
* listener for this connection.
@@ -469,7 +469,7 @@ public class ActiveMQConnection implemen
* Gets the <code>ClientInternalExceptionListener</code> object for this connection.
* Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
* associated with it.
- *
+ *
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener()
@@ -483,19 +483,19 @@ public class ActiveMQConnection implemen
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
* describing the problem.
- *
+ *
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
{
this.clientInternalExceptionListener = listener;
}
-
+
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
- *
+ *
* @throws JMSException if the JMS provider fails to start message delivery
* due to some internal error.
* @see javax.jms.Connection#stop()
@@ -537,7 +537,7 @@ public class ActiveMQConnection implemen
* <CODE>stop</CODE> call must wait until all of them have returned before
* it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
- *
+ *
* @throws JMSException if the JMS provider fails to stop message delivery
* due to some internal error.
* @see javax.jms.Connection#start()
@@ -591,7 +591,7 @@ public class ActiveMQConnection implemen
* a closed connection's session must throw an
* <CODE>IllegalStateException</CODE>. Closing a closed connection must
* NOT throw an exception.
- *
+ *
* @throws JMSException if the JMS provider fails to close the connection
* due to some internal error. For example, a failure to
* release resources or to close a socket connection can
@@ -651,7 +651,7 @@ public class ActiveMQConnection implemen
ActiveMQTempDestination c = i.next();
c.delete();
}
-
+
if (isConnectionInfoSentToBroker) {
// If we announced ourselfs to the broker.. Try to let
// the broker
@@ -706,7 +706,7 @@ public class ActiveMQConnection implemen
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
- *
+ *
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
@@ -737,7 +737,7 @@ public class ActiveMQConnection implemen
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
- *
+ *
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
@@ -788,7 +788,7 @@ public class ActiveMQConnection implemen
/**
* Returns true if this connection has been started
- *
+ *
* @return true if this Connection is started
*/
public boolean isStarted() {
@@ -936,7 +936,7 @@ public class ActiveMQConnection implemen
* Enables or disables whether or not queue consumers should be exclusive or
* not for example to preserve ordering when not using <a
* href="http://activemq.apache.org/message-groups.html">Message Groups</a>
- *
+ *
* @param exclusiveConsumer
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
@@ -958,7 +958,7 @@ public class ActiveMQConnection implemen
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
-
+
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
@@ -1023,7 +1023,7 @@ public class ActiveMQConnection implemen
/**
* Used internally for adding Sessions to the Connection
- *
+ *
* @param session
* @throws JMSException
* @throws JMSException
@@ -1037,7 +1037,7 @@ public class ActiveMQConnection implemen
/**
* Used interanlly for removing Sessions from a Connection
- *
+ *
* @param session
*/
protected void removeSession(ActiveMQSession session) {
@@ -1047,7 +1047,7 @@ public class ActiveMQConnection implemen
/**
* Add a ConnectionConsumer
- *
+ *
* @param connectionConsumer
* @throws JMSException
*/
@@ -1057,7 +1057,7 @@ public class ActiveMQConnection implemen
/**
* Remove a ConnectionConsumer
- *
+ *
* @param connectionConsumer
*/
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
@@ -1067,7 +1067,7 @@ public class ActiveMQConnection implemen
/**
* Creates a <CODE>TopicSession</CODE> object.
- *
+ *
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@@ -1091,7 +1091,7 @@ public class ActiveMQConnection implemen
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
- *
+ *
* @param topic the topic to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@@ -1119,7 +1119,7 @@ public class ActiveMQConnection implemen
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
- *
+ *
* @param queue the queue to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@@ -1147,7 +1147,7 @@ public class ActiveMQConnection implemen
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
- *
+ *
* @param destination the destination to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@@ -1212,7 +1212,7 @@ public class ActiveMQConnection implemen
/**
* Creates a <CODE>QueueSession</CODE> object.
- *
+ *
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@@ -1238,7 +1238,7 @@ public class ActiveMQConnection implemen
* If the clientID was not specified this method will throw an exception.
* This method is used to ensure that the clientID + durableSubscriber name
* are used correctly.
- *
+ *
* @throws JMSException
*/
public void checkClientIDWasManuallySpecified() throws JMSException {
@@ -1249,7 +1249,7 @@ public class ActiveMQConnection implemen
/**
* send a Packet through the Connection - for internal use only
- *
+ *
* @param command
* @throws JMSException
*/
@@ -1261,17 +1261,17 @@ public class ActiveMQConnection implemen
}
}
- private void doAsyncSendPacket(Command command) throws JMSException {
- try {
- this.transport.oneway(command);
- } catch (IOException e) {
- throw JMSExceptionSupport.create(e);
- }
- }
+ private void doAsyncSendPacket(Command command) throws JMSException {
+ try {
+ this.transport.oneway(command);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
/**
* Send a packet through a Connection - for internal use only
- *
+ *
* @param command
* @return
* @throws JMSException
@@ -1311,7 +1311,7 @@ public class ActiveMQConnection implemen
/**
* Send a packet through a Connection - for internal use only
- *
+ *
* @param command
* @return
* @throws JMSException
@@ -1324,25 +1324,25 @@ public class ActiveMQConnection implemen
}
}
- private Response doSyncSendPacket(Command command, int timeout)
- throws JMSException {
- try {
- Response response = (Response) (timeout > 0
- ? this.transport.request(command, timeout)
+ private Response doSyncSendPacket(Command command, int timeout)
+ throws JMSException {
+ try {
+ Response response = (Response) (timeout > 0
+ ? this.transport.request(command, timeout)
: this.transport.request(command));
- if (response != null && response.isException()) {
- ExceptionResponse er = (ExceptionResponse)response;
- if (er.getException() instanceof JMSException) {
- throw (JMSException)er.getException();
- } else {
- throw JMSExceptionSupport.create(er.getException());
- }
- }
- return response;
- } catch (IOException e) {
- throw JMSExceptionSupport.create(e);
- }
- }
+ if (response != null && response.isException()) {
+ ExceptionResponse er = (ExceptionResponse)response;
+ if (er.getException() instanceof JMSException) {
+ throw (JMSException)er.getException();
+ } else {
+ throw JMSExceptionSupport.create(er.getException());
+ }
+ }
+ return response;
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
/**
* @return statistics for this Connection
@@ -1354,7 +1354,7 @@ public class ActiveMQConnection implemen
/**
* simply throws an exception if the Connection is already closed or the
* Transport has failed
- *
+ *
* @throws JMSException
*/
protected synchronized void checkClosedOrFailed() throws JMSException {
@@ -1366,7 +1366,7 @@ public class ActiveMQConnection implemen
/**
* simply throws an exception if the Connection is already closed
- *
+ *
* @throws JMSException
*/
protected synchronized void checkClosed() throws JMSException {
@@ -1377,7 +1377,7 @@ public class ActiveMQConnection implemen
/**
* Send the ConnectionInfo to the Broker
- *
+ *
* @throws JMSException
*/
protected void ensureConnectionInfoSent() throws JMSException {
@@ -1391,12 +1391,12 @@ public class ActiveMQConnection implemen
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info.copy());
-
+
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
-
+
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
@@ -1439,13 +1439,13 @@ public class ActiveMQConnection implemen
/**
* Set true if always require messages to be sync sent
- *
+ *
* @param alwaysSyncSend
*/
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
-
+
/**
* @return the messagePrioritySupported
*/
@@ -1509,7 +1509,7 @@ public class ActiveMQConnection implemen
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
* before calling this method.
- *
+ *
* @throws IllegalStateException if the connection is in used.
*/
public void changeUserInfo(String userName, String password) throws JMSException {
@@ -1614,7 +1614,7 @@ public class ActiveMQConnection implemen
/**
* Enables an optimised acknowledgement mode where messages are acknowledged
* in batches rather than individually
- *
+ *
* @param optimizeAcknowledge The optimizeAcknowledge to set.
*/
public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
@@ -1650,7 +1650,7 @@ public class ActiveMQConnection implemen
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
}
-
+
/**
* @return the sendTimeout
*/
@@ -1664,7 +1664,7 @@ public class ActiveMQConnection implemen
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
-
+
/**
* @return the sendAcksAsync
*/
@@ -1824,7 +1824,7 @@ public class ActiveMQConnection implemen
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.
- *
+ *
* @param error the exception that the problem
*/
public void onClientInternalException(final Throwable error) {
@@ -1836,14 +1836,14 @@ public class ActiveMQConnection implemen
}
});
} else {
- LOG.debug("Async client internal exception occurred with no exception listener registered: "
+ LOG.debug("Async client internal exception occurred with no exception listener registered: "
+ error, error);
}
}
}
/**
* Used for handling async exceptions
- *
+ *
* @param error
*/
public void onAsyncException(Throwable error) {
@@ -1868,27 +1868,27 @@ public class ActiveMQConnection implemen
}
public void onException(final IOException error) {
- onAsyncException(error);
- if (!closing.get() && !closed.get()) {
- executor.execute(new Runnable() {
- public void run() {
- transportFailed(error);
- ServiceSupport.dispose(ActiveMQConnection.this.transport);
- brokerInfoReceived.countDown();
- try {
- cleanup();
- } catch (JMSException e) {
- LOG.warn("Exception during connection cleanup, " + e, e);
- }
- for (Iterator<TransportListener> iter = transportListeners
- .iterator(); iter.hasNext();) {
- TransportListener listener = iter.next();
- listener.onException(error);
- }
- }
- });
- }
- }
+ onAsyncException(error);
+ if (!closing.get() && !closed.get()) {
+ executor.execute(new Runnable() {
+ public void run() {
+ transportFailed(error);
+ ServiceSupport.dispose(ActiveMQConnection.this.transport);
+ brokerInfoReceived.countDown();
+ try {
+ cleanup();
+ } catch (JMSException e) {
+ LOG.warn("Exception during connection cleanup, " + e, e);
+ }
+ for (Iterator<TransportListener> iter = transportListeners
+ .iterator(); iter.hasNext();) {
+ TransportListener listener = iter.next();
+ listener.onException(error);
+ }
+ }
+ });
+ }
+ }
public void transportInterupted() {
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
@@ -1901,11 +1901,11 @@ public class ActiveMQConnection implemen
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
-
+
for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
- connectionConsumer.clearMessagesInProgress();
+ connectionConsumer.clearMessagesInProgress();
}
-
+
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
@@ -1921,7 +1921,7 @@ public class ActiveMQConnection implemen
/**
* Create the DestinationInfo object for the temporary destination.
- *
+ *
* @param topic - if its true topic, else queue.
* @return DestinationInfo
* @throws JMSException
@@ -2029,7 +2029,7 @@ public class ActiveMQConnection implemen
* minimize context switches which boost performance. However sometimes its
* better to go slower to ensure that a single blocked consumer socket does
* not block delivery to other consumers.
- *
+ *
* @param asyncDispatch If true then consumers created on this connection
* will default to having their messages dispatched
* asynchronously. The default value is false.
@@ -2069,7 +2069,7 @@ public class ActiveMQConnection implemen
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
-
+
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false);
}
@@ -2085,7 +2085,7 @@ public class ActiveMQConnection implemen
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
}
-
+
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
@@ -2112,7 +2112,7 @@ public class ActiveMQConnection implemen
* Creates an output stream allowing full control over the delivery mode,
* the priority and time to live of the messages and the properties added to
* messages on the stream.
- *
+ *
* @param streamProperties defines a map of key-value pairs where the keys
* are strings and the values are primitive values (numbers
* and strings) which are appended to the messages similarly
@@ -2137,7 +2137,7 @@ public class ActiveMQConnection implemen
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
* message is part of a pending transaction or has not been acknowledged in
* the session.
- *
+ *
* @param name the name used to identify this subscription
* @throws JMSException if the session fails to unsubscribe to the durable
* subscription due to some internal error.
@@ -2290,11 +2290,11 @@ public class ActiveMQConnection implemen
public void setAuditDepth(int auditDepth) {
connectionAudit.setAuditDepth(auditDepth);
- }
+ }
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
- }
+ }
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
@@ -2308,13 +2308,13 @@ public class ActiveMQConnection implemen
connectionAudit.rollbackDuplicate(dispatcher, message);
}
- public IOException getFirstFailureError() {
- return firstFailureError;
- }
-
- protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if (cdl != null) {
+ public IOException getFirstFailureError() {
+ return firstFailureError;
+ }
+
+ protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if (cdl != null) {
if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
cdl.await(10, TimeUnit.SECONDS);
@@ -2322,16 +2322,16 @@ public class ActiveMQConnection implemen
signalInterruptionProcessingComplete();
}
}
-
- protected void transportInterruptionProcessingComplete() {
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if (cdl != null) {
- cdl.countDown();
- try {
- signalInterruptionProcessingComplete();
- } catch (InterruptedException ignored) {}
- }
- }
+
+ protected void transportInterruptionProcessingComplete() {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if (cdl != null) {
+ cdl.countDown();
+ try {
+ signalInterruptionProcessingComplete();
+ } catch (InterruptedException ignored) {}
+ }
+ }
private void signalInterruptionProcessingComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
@@ -2372,15 +2372,15 @@ public class ActiveMQConnection implemen
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}
-
+
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
-
+
protected Scheduler getScheduler() {
return this.scheduler;
}
-
+
protected ThreadPoolExecutor getExecutor() {
return this.executor;
}
@@ -2399,4 +2399,28 @@ public class ActiveMQConnection implemen
this.checkForDuplicates = checkForDuplicates;
}
+ /**
+ * Removes any TempDestinations that this connection has cached, ignoring
+ * any exceptions generated because the destination is in use as they should
+ * not be removed.
+ */
+ public void cleanUpTempDestinations() {
+
+ if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
+ return;
+ }
+
+ Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
+ = this.activeTempDestinations.entrySet().iterator();
+ while(entries.hasNext()) {
+ ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
+ try {
+ this.deleteTempDestination(entry.getValue());
+ } catch (Exception ex) {
+ // the temp dest is in use so it can not be deleted.
+ // it is ok to leave it to connection tear down phase
+ }
+ }
+ }
+
}
Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java Sat Jul 2 18:59:33 2011
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.pool;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -36,18 +39,19 @@ import org.apache.activemq.ActiveMQSessi
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.EnhancedConnection;
import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.command.ActiveMQTempDestination;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* itself to the sessionPool.
- *
+ *
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
- *
- *
+ *
+ *
*/
public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
@@ -69,6 +73,9 @@ public class PooledConnection implements
public void close() throws JMSException {
if (this.pool != null) {
this.pool.decrementReferenceCount();
+ if (this.pool.getConnection() != null) {
+ this.pool.getConnection().cleanUpTempDestinations();
+ }
this.pool = null;
}
}
@@ -143,7 +150,7 @@ public class PooledConnection implements
// EnhancedCollection API
// -------------------------------------------------------------------------
-
+
public DestinationSource getDestinationSource() throws JMSException {
return getConnection().getDestinationSource();
}
@@ -169,5 +176,4 @@ public class PooledConnection implements
public String toString() {
return "PooledConnection { " + pool + " }";
}
-
}
Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java?rev=1142267&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java (added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java Sat Jul 2 18:59:33 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.Connection;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryWithTemporaryDestinationsTest.class);
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory factory;
+ private PooledConnectionFactory pooledFactory;
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ TransportConnector connector = broker.addConnector("tcp://localhost:0");
+ broker.start();
+ factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
+ pooledFactory = new PooledConnectionFactory(factory);
+ }
+
+ protected void tearDown() throws Exception {
+ broker.stop();
+ }
+
+ public void testTemporaryQueueLeakAfterConnectionClose() throws Exception {
+ Connection pooledConnection = null;
+ Session session = null;
+ Queue tempQueue = null;
+ for (int i = 0; i < 2; i++) {
+ pooledConnection = pooledFactory.createConnection();
+ session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ tempQueue = session.createTemporaryQueue();
+ LOG.info("Created queue named: " + tempQueue.getQueueName());
+ pooledConnection.close();
+ }
+
+ assertEquals(0, countBrokerTemporaryQueues());
+ }
+
+ public void testTemporaryTopicLeakAfterConnectionClose() throws Exception {
+ Connection pooledConnection = null;
+ Session session = null;
+ Topic tempTopic = null;
+ for (int i = 0; i < 2; i++) {
+ pooledConnection = pooledFactory.createConnection();
+ session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ tempTopic = session.createTemporaryTopic();
+ LOG.info("Created topic named: " + tempTopic.getTopicName());
+ pooledConnection.close();
+ }
+
+ assertEquals(0, countBrokerTemporaryTopics());
+ }
+
+ private int countBrokerTemporaryQueues() throws Exception {
+ return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+ }
+
+ private int countBrokerTemporaryTopics() throws Exception {
+ return ((RegionBroker) broker.getRegionBroker()).getTempTopicRegion().getDestinationMap().size();
+ }
+}
Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
------------------------------------------------------------------------------
svn:eol-style = native