You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by James Strachan <ja...@gmail.com> on 2006/03/17 12:23:35 UTC
Re: svn commit: r386608 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Whoops; wrong commit message - got my patches to AbstractConnection and
ActiveMQConnection mixed up :)
This patch was actually 2 different things.
* a fix for AMQ-623 to ensure that we don't try to send further commands
after an EOFException has been caught
* implement AMQ-642 to allow folks to add a TransportListener to an
ActiveMQConnection to listen to the connection start/stop suspend/resume etc
On 3/17/06, jstrachan@apache.org <js...@apache.org> wrote:
>
> Author: jstrachan
> Date: Fri Mar 17 03:14:11 2006
> New Revision: 386608
>
> URL: http://svn.apache.org/viewcvs?rev=386608&view=rev
> Log:
> patch for AMQ-600 to catch IOException caused by attempts to dispatch
> synchronously to a connection on a dead socket and treat them as a transport
> exception (rather than service exception), disposing the connection so that
> clientID's can be reused
>
> Modified:
>
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
>
> Modified:
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> URL:
> http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=386608&r1=386607&r2=386608&view=diff
>
> ==============================================================================
> ---
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> (original)
> +++
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> Fri Mar 17 03:14:11 2006
> @@ -72,6 +72,7 @@
> import org.apache.activemq.thread.TaskRunnerFactory;
> import org.apache.activemq.transport.DefaultTransportListener;
> import org.apache.activemq.transport.Transport;
> +import org.apache.activemq.transport.TransportListener;
> import org.apache.activemq.util.IdGenerator;
> import org.apache.activemq.util.IntrospectionSupport;
> import org.apache.activemq.util.JMSExceptionSupport;
> @@ -85,7 +86,7 @@
> import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean
> ;
>
>
> -public class ActiveMQConnection extends DefaultTransportListener
> implements Connection, TopicConnection, QueueConnection, StatsCapable,
> Closeable, StreamConnection {
> +public class ActiveMQConnection implements Connection, TopicConnection,
> QueueConnection, StatsCapable, Closeable, StreamConnection,
> TransportListener {
>
> public static final TaskRunnerFactory SESSION_TASK_RUNNER = new
> TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION
> ,true,1000);
>
> @@ -130,6 +131,7 @@
> private final CopyOnWriteArrayList connectionConsumers = new
> CopyOnWriteArrayList();
> private final CopyOnWriteArrayList inputStreams = new
> CopyOnWriteArrayList();
> private final CopyOnWriteArrayList outputStreams = new
> CopyOnWriteArrayList();
> + private final CopyOnWriteArrayList transportListeners = new
> CopyOnWriteArrayList();
>
> // Maps ConsumerIds to ActiveMQConsumer objects
> private final ConcurrentHashMap dispatchers = new
> ConcurrentHashMap();
> @@ -147,7 +149,6 @@
> private IOException firstFailureError;
>
>
> -
> /**
> * Construct an <code>ActiveMQConnection</code>
> * @param transport
> @@ -790,6 +791,17 @@
> this.useRetroactiveConsumer = useRetroactiveConsumer;
> }
>
> + /**
> + * Adds a transport listener so that a client can be notified of
> events in the underlying
> + * transport
> + */
> + public void addTransportListener(TransportListener transportListener)
> {
> + transportListeners.add(transportListener);
> + }
> +
> + public void removeTransportListener(TransportListener
> transportListener) {
> + transportListeners.remove(transportListener);
> + }
>
> // Implementation methods
> //
> -------------------------------------------------------------------------
> @@ -1175,7 +1187,7 @@
> */
> protected void ensureConnectionInfoSent() throws JMSException {
> // Can we skip sending the ConnectionInfo packet??
> - if (isConnectionInfoSentToBroker) {
> + if (isConnectionInfoSentToBroker || closed.get()) {
> return;
> }
>
> @@ -1241,7 +1253,7 @@
> }
>
> if(isConnectionInfoSentToBroker){
> - if(!transportFailed.get()){
> + if(!transportFailed.get() && !closing.get()){
> asyncSendPacket(info.createRemoveCommand());
> }
> isConnectionInfoSentToBroker=false;
> @@ -1368,6 +1380,10 @@
>
> onAsyncException(((ConnectionError)command).getException());
> }
> }
> + for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> + TransportListener listener = (TransportListener) iter.next();
> + listener.onCommand(command);
> + }
> }
>
> /**
> @@ -1386,14 +1402,33 @@
> }
> }
> }
> -
>
> public void onException(IOException error) {
> onAsyncException(error);
> transportFailed(error);
> ServiceSupport.dispose(this.transport);
> brokerInfoReceived.countDown();
> +
> + for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> + TransportListener listener = (TransportListener) iter.next();
> + listener.onException(error);
> + }
> + }
> +
> + public void transportInterupted() {
> + for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> + TransportListener listener = (TransportListener) iter.next();
> + listener.transportInterupted();
> + }
> }
> +
> + public void transportResumed() {
> + for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> + TransportListener listener = (TransportListener) iter.next();
> + listener.transportResumed();
> + }
> + }
> +
>
> /**
> * Create the DestinationInfo object for the temporary destination.
>
>
>
--
James
-------
http://radio.weblogs.com/0112098/