You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by James Strachan <ja...@gmail.com> on 2006/03/17 12:23:35 UTC

Re: svn commit: r386608 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Whoops; wrong commit message - got my patches to AbstractConnection and
ActiveMQConnection mixed up :)

This patch was actually 2 different things.

* a fix for AMQ-623 to ensure that we don't try to send further commands
after an EOFException has been caught
* implement AMQ-642 to allow folks to add a TransportListener to an
ActiveMQConnection to listen to the connection start/stop suspend/resume etc

On 3/17/06, jstrachan@apache.org <js...@apache.org> wrote:
>
> Author: jstrachan
> Date: Fri Mar 17 03:14:11 2006
> New Revision: 386608
>
> URL: http://svn.apache.org/viewcvs?rev=386608&view=rev
> Log:
> patch for AMQ-600 to catch IOException caused by attempts to dispatch
> synchronously to a connection on a dead socket and treat them as a transport
> exception (rather than service exception), disposing the connection so that
> clientID's can be reused
>
> Modified:
>
>     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
>
> Modified:
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> URL:
> http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=386608&r1=386607&r2=386608&view=diff
>
> ==============================================================================
> ---
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> (original)
> +++
> incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
> Fri Mar 17 03:14:11 2006
> @@ -72,6 +72,7 @@
> import org.apache.activemq.thread.TaskRunnerFactory;
> import org.apache.activemq.transport.DefaultTransportListener;
> import org.apache.activemq.transport.Transport;
> +import org.apache.activemq.transport.TransportListener;
> import org.apache.activemq.util.IdGenerator;
> import org.apache.activemq.util.IntrospectionSupport;
> import org.apache.activemq.util.JMSExceptionSupport;
> @@ -85,7 +86,7 @@
> import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean
> ;
>
>
> -public class ActiveMQConnection extends DefaultTransportListener
> implements Connection, TopicConnection, QueueConnection, StatsCapable,
> Closeable,  StreamConnection {
> +public class ActiveMQConnection implements Connection, TopicConnection,
> QueueConnection, StatsCapable, Closeable,  StreamConnection,
> TransportListener {
>
>      public static final TaskRunnerFactory SESSION_TASK_RUNNER = new
> TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION
> ,true,1000);
>
> @@ -130,6 +131,7 @@
>      private final CopyOnWriteArrayList connectionConsumers = new
> CopyOnWriteArrayList();
>      private final CopyOnWriteArrayList inputStreams = new
> CopyOnWriteArrayList();
>      private final CopyOnWriteArrayList outputStreams = new
> CopyOnWriteArrayList();
> +    private final CopyOnWriteArrayList transportListeners = new
> CopyOnWriteArrayList();
>
>      // Maps ConsumerIds to ActiveMQConsumer objects
>      private final ConcurrentHashMap dispatchers = new
> ConcurrentHashMap();
> @@ -147,7 +149,6 @@
>      private IOException firstFailureError;
>
>
> -
>      /**
>       * Construct an <code>ActiveMQConnection</code>
>       * @param transport
> @@ -790,6 +791,17 @@
>          this.useRetroactiveConsumer = useRetroactiveConsumer;
>      }
>
> +    /**
> +     * Adds a transport listener so that a client can be notified of
> events in the underlying
> +     * transport
> +     */
> +    public void addTransportListener(TransportListener transportListener)
> {
> +        transportListeners.add(transportListener);
> +    }
> +
> +    public void removeTransportListener(TransportListener
> transportListener) {
> +        transportListeners.remove(transportListener);
> +    }
>
>      // Implementation methods
>      //
> -------------------------------------------------------------------------
> @@ -1175,7 +1187,7 @@
>       */
>      protected void ensureConnectionInfoSent() throws JMSException {
>          // Can we skip sending the ConnectionInfo packet??
> -        if (isConnectionInfoSentToBroker) {
> +        if (isConnectionInfoSentToBroker || closed.get()) {
>              return;
>          }
>
> @@ -1241,7 +1253,7 @@
>          }
>
>          if(isConnectionInfoSentToBroker){
> -            if(!transportFailed.get()){
> +            if(!transportFailed.get() && !closing.get()){
>                  asyncSendPacket(info.createRemoveCommand());
>              }
>              isConnectionInfoSentToBroker=false;
> @@ -1368,6 +1380,10 @@
>
> onAsyncException(((ConnectionError)command).getException());
>              }
>          }
> +        for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> +            TransportListener listener = (TransportListener) iter.next();
> +            listener.onCommand(command);
> +        }
>      }
>
>      /**
> @@ -1386,14 +1402,33 @@
>              }
>          }
>      }
> -
>
>      public void onException(IOException error) {
>          onAsyncException(error);
>          transportFailed(error);
>          ServiceSupport.dispose(this.transport);
>          brokerInfoReceived.countDown();
> +
> +        for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> +            TransportListener listener = (TransportListener) iter.next();
> +            listener.onException(error);
> +        }
> +    }
> +
> +    public void transportInterupted() {
> +        for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> +            TransportListener listener = (TransportListener) iter.next();
> +            listener.transportInterupted();
> +        }
>      }
> +
> +    public void transportResumed() {
> +        for (Iterator iter = transportListeners.iterator(); iter.hasNext();)
> {
> +            TransportListener listener = (TransportListener) iter.next();
> +            listener.transportResumed();
> +        }
> +    }
> +
>
>      /**
>       * Create the DestinationInfo object for the temporary destination.
>
>
>


--

James
-------
http://radio.weblogs.com/0112098/