You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/30 07:34:39 UTC

svn commit: r560872 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/...

Author: chirino
Date: Sun Jul 29 22:34:37 2007
New Revision: 560872

URL: http://svn.apache.org/viewvc?view=rev&rev=560872
Log:
Serveral changes needed to Fix https://issues.apache.org/activemq/browse/AMQ-1349
 - The vm:// transport was delivering events to the listener before start() was called.  Also clean it up a little by consolidating
   the use of the prePeerSetQueue and messageQueue field.
 - the tcp:// .stop() method now blocks until the thread that calls out to the listener is shutdown.
 - TransportConnection was not doing a good job synchronizing when multiple concurrent conenctions to the same connection Id was established.
   IllegalStateExceptions were common when a failover connection reconnected.  Now we make sure that only 1 connection with a given connectionId
   is activley operating in the broker.  Also removed 1 un-needed hash lookup by replacing the brokerConnectionStates Map with the 
   connectionState variable.
    
Also added a pause in the JmsTempDestinationTest to avoid intermitent failures.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Sun Jul 29 22:34:37 2007
@@ -54,7 +54,6 @@
     private Object longTermStoreContext;
     private boolean producerFlowControl=true;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
-    private AtomicInteger referenceCounter = new AtomicInteger();
     private boolean networkConnection;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@@ -241,15 +240,6 @@
         }
         return true;
     }
-
-	public int incrementReference() {
-		return referenceCounter.incrementAndGet();
-	}
-	
-	public int decrementReference() {
-		return referenceCounter.decrementAndGet();
-	}
-
 	public synchronized boolean isNetworkConnection() {
 		return networkConnection;
 	}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sun Jul 29 22:34:37 2007
@@ -72,6 +72,7 @@
 import org.apache.activemq.network.NetworkBridgeFactory;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ConsumerState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
@@ -105,8 +106,8 @@
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     // Keeps track of the state of the connections.
-    protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
-    protected final Map brokerConnectionStates;
+//    protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
+    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
     private WireFormatInfo wireFormatInfo;
@@ -140,16 +141,18 @@
     private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     private DemandForwardingBridge duplexBridge = null;
 	final private TaskRunnerFactory taskRunnerFactory;
+	private TransportConnectionState connectionState;
     
-    static class ConnectionState extends org.apache.activemq.state.ConnectionState{
+    static class TransportConnectionState extends org.apache.activemq.state.ConnectionState{
 
-        private final ConnectionContext context;
-        TransportConnection connection;
+        private ConnectionContext context;
+        private TransportConnection connection;
+        private final Object connectMutex = new Object();
+        private AtomicInteger referenceCounter = new AtomicInteger();
 
-        public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
+        public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection){
             super(info);
-            this.context=context;
-            this.connection=connection;
+            connection=transportConnection;
         }
 
         public ConnectionContext getContext(){
@@ -159,6 +162,23 @@
         public TransportConnection getConnection(){
             return connection;
         }
+
+		public void setContext(ConnectionContext context) {
+			this.context = context;
+		}
+
+		public void setConnection(TransportConnection connection) {
+			this.connection = connection;
+		}
+		
+		public int incrementReference() {
+			return referenceCounter.incrementAndGet();
+		}
+		
+		public int decrementReference() {
+			return referenceCounter.decrementAndGet();
+		}
+
     }
 
     /**
@@ -307,36 +327,6 @@
         return response;
     }
 
-    protected ConnectionState lookupConnectionState(ConsumerId id){
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
-        if(cs==null)
-            throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
-                    +id.getParentId().getParentId());
-        return cs;
-    }
-
-    protected ConnectionState lookupConnectionState(ProducerId id){
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
-        if(cs==null)
-            throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
-                    +id.getParentId().getParentId());
-        return cs;
-    }
-
-    protected ConnectionState lookupConnectionState(SessionId id){
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
-        if(cs==null)
-            throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
-                    +id.getParentId());
-        return cs;
-    }
-
-    protected ConnectionState lookupConnectionState(ConnectionId connectionId){
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
-        if(cs==null)
-            throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
-        return cs;
-    }
 
     public Response processKeepAlive(KeepAliveInfo info) throws Exception{
         return null;
@@ -354,7 +344,15 @@
     }
 
     public Response processShutdown(ShutdownInfo info) throws Exception{
-        stop();
+        new Thread("Async Exception Handler"){
+            public void run(){
+                try {
+					TransportConnection.this.stop();
+				} catch (Exception e) {
+					serviceException(e);
+				}
+            }
+        }.start();
         return null;
     }
 
@@ -363,7 +361,7 @@
     }
 
     synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
         context=null;
         if(cs!=null){
             context=cs.getContext();
@@ -387,7 +385,7 @@
     }
 
     synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
         context=null;
         if(cs!=null){
             context=cs.getContext();
@@ -413,63 +411,39 @@
     }
 
     synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        context=null;
-        if(cs!=null){
-            context=cs.getContext();
-        }
-        if (cs == null) {
-            throw new NullPointerException("Context is null");
-        }
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+        context=cs.getContext();
         cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context,info.getTransactionId(),true);
         return null;
     }
 
     synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        context=null;
-        if(cs!=null){
-            context=cs.getContext();
-        }
-        if (cs == null) {
-            throw new NullPointerException("Context is null");
-        }
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+        context=cs.getContext();
         cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context,info.getTransactionId(),false);
         return null;
     }
 
     synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        context=null;
-        if(cs!=null){
-            context=cs.getContext();
-        }
-        if (cs == null) {
-            throw new NullPointerException("Context is null");
-        }
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+        context=cs.getContext();
         cs.removeTransactionState(info.getTransactionId());
         broker.rollbackTransaction(context,info.getTransactionId());
         return null;
     }
 
     synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        context=null;
-        if(cs!=null){
-            context=cs.getContext();
-        }
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+        context=cs.getContext();
         broker.forgetTransaction(context,info.getTransactionId());
         return null;
     }
 
     synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
-        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
-        context=null;
-        if(cs!=null){
-            context=cs.getContext();
-        }
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+        context=cs.getContext();
         TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
         return new DataArrayResponse(preparedTransactions);
     }
@@ -497,7 +471,7 @@
     }
 
     synchronized public Response processAddDestination(DestinationInfo info) throws Exception{
-        ConnectionState cs=lookupConnectionState(info.getConnectionId());
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
         broker.addDestinationInfo(cs.getContext(),info);
         if(info.getDestination().isTemporary()){
             cs.addTempDestination(info);
@@ -506,7 +480,7 @@
     }
 
     synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{
-        ConnectionState cs=lookupConnectionState(info.getConnectionId());
+        TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
         broker.removeDestinationInfo(cs.getContext(),info);
         if(info.getDestination().isTemporary()){
             cs.removeTempDestination(info.getDestination());
@@ -517,7 +491,7 @@
     synchronized public Response processAddProducer(ProducerInfo info) throws Exception{
         SessionId sessionId=info.getProducerId().getParentId();
         ConnectionId connectionId=sessionId.getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         SessionState ss=cs.getSessionState(sessionId);
         if(ss==null)
             throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
@@ -537,7 +511,7 @@
     synchronized public Response processRemoveProducer(ProducerId id) throws Exception{
         SessionId sessionId=id.getParentId();
         ConnectionId connectionId=sessionId.getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         SessionState ss=cs.getSessionState(sessionId);
         if(ss==null)
             throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
@@ -553,7 +527,7 @@
     synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{
         SessionId sessionId=info.getConsumerId().getParentId();
         ConnectionId connectionId=sessionId.getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         SessionState ss=cs.getSessionState(sessionId);
         if(ss==null)
             throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "
@@ -573,7 +547,7 @@
     synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{
         SessionId sessionId=id.getParentId();
         ConnectionId connectionId=sessionId.getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         SessionState ss=cs.getSessionState(sessionId);
         if(ss==null)
             throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
@@ -588,7 +562,7 @@
 
     synchronized public Response processAddSession(SessionInfo info) throws Exception{
         ConnectionId connectionId=info.getSessionId().getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         // Avoid replaying dup commands
         if(!cs.getSessionIds().contains(info.getSessionId())){
             broker.addSession(cs.getContext(),info);
@@ -603,7 +577,7 @@
 
     synchronized public Response processRemoveSession(SessionId id) throws Exception{
         ConnectionId connectionId=id.getParentId();
-        ConnectionState cs=lookupConnectionState(connectionId);
+        TransportConnectionState cs=lookupConnectionState(connectionId);
         SessionState session=cs.getSessionState(id);
         if(session==null)
             throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
@@ -631,21 +605,36 @@
         return null;
     }
 
-    synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{
-        ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
-        if(state!=null){
-            // ConnectionInfo replay?? Chances are that it's a client reconnecting,
-            // and we have not detected that that old connection died.. Kill the old connection
-            // to make sure our state is in sync with the client.
-            if(this!=state.getConnection()){
-                log.debug("Killing previous stale connection: "+state.getConnection());
-                state.getConnection().stop();
-                if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
-                    throw new Exception("Previous connection could not be clean up.");
-                }
-            }
+    public Response processAddConnection(ConnectionInfo info) throws Exception {
+    	
+        TransportConnectionState state;
+        
+        // Make sure 2 concurrent connections by the same ID only generate 1 TransportConnectionState object.
+        synchronized(brokerConnectionStates) {
+	        state=(TransportConnectionState)brokerConnectionStates.get(info.getConnectionId());
+	        if( state==null ) {
+	            state=new TransportConnectionState(info,this);
+	            brokerConnectionStates.put(info.getConnectionId(),state);
+	        }
+	        state.incrementReference();
         }
-        log.debug("Setting up new connection: "+this);
+        
+        	
+        // If there are 2 concurrent connections for the same connection id, then last one in wins, we need to sync here 
+        // to figure out the winner.
+        synchronized(state.connectMutex) {
+	        if( state.getConnection()!=this ) {
+	            log.debug("Killing previous stale connection: "+state.getConnection().getRemoteAddress());
+	            state.getConnection().stop(); 
+	            log.debug("Connection "+getRemoteAddress()+" taking over previous connection: "+state.getConnection().getRemoteAddress());
+		        state.setConnection(this);
+		        state.reset(info);
+	        }
+        }
+        
+        registerConnectionState(info.getConnectionId(),state);
+        	
+        log.debug("Setting up new connection: "+getRemoteAddress());
         // Setup the context.
         String clientId=info.getClientId();
         context=new ConnectionContext();
@@ -659,11 +648,10 @@
         context.setClientMaster(info.isClientMaster());
         context.setWireFormatInfo(wireFormatInfo);
         context.setNetworkConnection(networkConnection);
-        context.incrementReference();
         this.manageable=info.isManageable();
-        state=new ConnectionState(info,context,this);
-        brokerConnectionStates.put(info.getConnectionId(),state);
-        localConnectionStates.put(info.getConnectionId(),state);
+        state.setContext(context);
+        state.setConnection(this);            
+
         broker.addConnection(context,info);
         if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
             // send ConnectionCommand
@@ -674,8 +662,9 @@
         return null;
     }
 
-    synchronized public Response processRemoveConnection(ConnectionId id){
-        ConnectionState cs=lookupConnectionState(id);
+
+	synchronized public Response processRemoveConnection(ConnectionId id){
+        TransportConnectionState cs=lookupConnectionState(id);
         // Don't allow things to be added to the connection state while we are shutting down.
         cs.shutdown();
         // Cascade the connection stop to the sessions.
@@ -702,12 +691,15 @@
         }catch(Throwable e){
             serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
         }
-        ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
-        if(state!=null){
-            // If we are the last reference, we should remove the state
-            // from the broker.
-            if(state.getContext().decrementReference()==0){
-                brokerConnectionStates.remove(id);
+        
+        TransportConnectionState state=unregisterConnectionState(id);
+        if(state!=null) {
+            synchronized(brokerConnectionStates) {
+                // If we are the last reference, we should remove the state
+                // from the broker.
+                if(state.decrementReference()==0){
+                    brokerConnectionStates.remove(id);
+                }
             }
         }
         return null;
@@ -869,97 +861,103 @@
                 return;
             }
         }
-        if(stopped.compareAndSet(false,true)){
-            log.debug("Stopping connection: "+transport.getRemoteAddress());
-            connector.onStopped(this);
-            try{
-                synchronized(this){
-                    if(masterBroker!=null){
-                        masterBroker.stop();
-                    }
-                    if(duplexBridge!=null){
-                        duplexBridge.stop();
-                    }
-                    // If the transport has not failed yet,
-                    // notify the peer that we are doing a normal shutdown.
-                    if(transportException==null){
-                        transport.oneway(new ShutdownInfo());
-                    }
-                }
-                
-            }catch(Exception ignore){
-                log.trace("Exception caught stopping",ignore);
-            }
-            transport.stop();
-            active=false;
-            if(disposed.compareAndSet(false,true)){
-
-                // Let all the connection contexts know we are shutting down
-                // so that in progress operations can notice and unblock.
-                 ArrayList l=new ArrayList(localConnectionStates.values());
-                 for(Iterator iter=l.iterator();iter.hasNext();){
-                     ConnectionState cs=(ConnectionState) iter.next();
-                     cs.getContext().getStopping().set(true);
-                 }            	
-            	
- 		        if( taskRunner!=null ) {
-                    taskRunner.wakeup();
-                    // Give it a change to stop gracefully.
-                    dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
-                    disposeTransport();
-		            taskRunner.shutdown();
-                } else {
-                    disposeTransport();
-                }
-
-		        if( taskRunner!=null )
-		            taskRunner.shutdown();
-		        
-                // Run the MessageDispatch callbacks so that message references get cleaned up.
-                for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
-                    Command command = (Command) iter.next();
-                    if(command.isMessageDispatch()) {
-                        MessageDispatch md=(MessageDispatch) command;
-                        Runnable sub=md.getTransmitCallback();
-                        broker.processDispatch(md);
-                        if(sub!=null){
-                            sub.run();
-                        }
-                    }
-                } 
-                //
-                // Remove all logical connection associated with this connection
-                // from the broker.
-                if(!broker.isStopped()){
-                	l=new ArrayList(localConnectionStates.keySet());
-                    for(Iterator iter=l.iterator();iter.hasNext();){
-                        ConnectionId connectionId=(ConnectionId)iter.next();
-                        try{
-                            log.debug("Cleaning up connection resources.");
-                            processRemoveConnection(connectionId);
-                        }catch(Throwable ignore){
-                            ignore.printStackTrace();
-                        }
-                    }
-                    if(brokerInfo!=null){
-                        broker.removeBroker(this,brokerInfo);
-                    }
-                }
-                stopLatch.countDown();
-            }
+        if(stopped.compareAndSet(false,true)) {
+            doStop();
+			stopLatch.countDown();
+        } else {
+        	stopLatch.await();
         }
     }
 
+	protected void doStop() throws Exception, InterruptedException {
+		log.debug("Stopping connection: "+transport.getRemoteAddress());
+		connector.onStopped(this);
+		try{
+		    synchronized(this){
+		        if(masterBroker!=null){
+		            masterBroker.stop();
+		        }
+		        if(duplexBridge!=null){
+		            duplexBridge.stop();
+		        }
+		        // If the transport has not failed yet,
+		        // notify the peer that we are doing a normal shutdown.
+		        if(transportException==null){
+		            transport.oneway(new ShutdownInfo());
+		        }
+		    }
+		    
+		}catch(Exception ignore){
+		    log.trace("Exception caught stopping",ignore);
+		}
+		if(disposed.compareAndSet(false,true)){
+
+		    // Let all the connection contexts know we are shutting down
+		    // so that in progress operations can notice and unblock.
+			 List<TransportConnectionState> connectionStates=listConnectionStates();
+		     for (TransportConnectionState cs : connectionStates) {
+		         cs.getContext().getStopping().set(true);
+		     }            	
+			
+		    if( taskRunner!=null ) {
+		        taskRunner.wakeup();
+		        // Give it a change to stop gracefully.
+		        dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
+		        disposeTransport();
+		        taskRunner.shutdown();
+		    } else {
+		        disposeTransport();
+		    }
+
+		    if( taskRunner!=null )
+		        taskRunner.shutdown();
+		    
+		    // Run the MessageDispatch callbacks so that message references get cleaned up.
+		    for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
+		        Command command = (Command) iter.next();
+		        if(command.isMessageDispatch()) {
+		            MessageDispatch md=(MessageDispatch) command;
+		            Runnable sub=md.getTransmitCallback();
+		            broker.processDispatch(md);
+		            if(sub!=null){
+		                sub.run();
+		            }
+		        }
+		    } 
+		    //
+		    // Remove all logical connection associated with this connection
+		    // from the broker.
+		    
+		    if (!broker.isStopped()) {
+				for (TransportConnectionState cs : connectionStates) {
+					cs.getContext().getStopping().set(true);
+					try {
+						log.debug("Cleaning up connection resources: " + getRemoteAddress());
+						processRemoveConnection(cs.getInfo().getConnectionId());
+					} catch (Throwable ignore) {
+						ignore.printStackTrace();
+					}
+				}
+
+				if (brokerInfo != null) {
+					broker.removeBroker(this, brokerInfo);
+				}
+			}
+			log.debug("Connection Stopped: " + getRemoteAddress());
+		}
+	}
+
     /**
-     * @return Returns the blockedCandidate.
-     */
+	 * @return Returns the blockedCandidate.
+	 */
     public boolean isBlockedCandidate(){
         return blockedCandidate;
     }
 
     /**
-     * @param blockedCandidate The blockedCandidate to set.
-     */
+	 * @param blockedCandidate
+	 *            The blockedCandidate to set.
+	 */
     public void setBlockedCandidate(boolean blockedCandidate){
         this.blockedCandidate=blockedCandidate;
     }
@@ -1115,15 +1113,16 @@
         if(this.brokerInfo!=null){
             log.warn("Unexpected extra broker info command received: "+info);
         }
-        this.brokerInfo=info;
-        broker.addBroker(this,info);
-        networkConnection = true;
-        for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
-            ConnectionState cs = (ConnectionState) iter.next();
-            cs.getContext().setNetworkConnection(true);
-        }   
-        
-        return null;
+        this.brokerInfo = info;
+		broker.addBroker(this, info);
+		networkConnection = true;
+
+		List<TransportConnectionState> connectionStates = listConnectionStates();
+		for (TransportConnectionState cs : connectionStates) {
+			cs.getContext().setNetworkConnection(true);
+		}
+
+		return null;
     }
 
     protected void dispatch(Command command) throws IOException{
@@ -1140,14 +1139,13 @@
     }
     
     public String getConnectionId() {
-        Iterator iterator = localConnectionStates.values().iterator();
-        ConnectionState object = (ConnectionState) iterator.next();
-        if( object == null ) {
-            return null;
-        }
-        if( object.getInfo().getClientId() !=null )
-            return object.getInfo().getClientId();
-        return object.getInfo().getConnectionId().toString();
+		List<TransportConnectionState> connectionStates = listConnectionStates();
+		for (TransportConnectionState cs : connectionStates) {
+	        if( cs.getInfo().getClientId() !=null )
+	            return cs.getInfo().getClientId();
+	        return cs.getInfo().getConnectionId().toString();
+		}
+		return null;
     }    
     
     private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
@@ -1155,7 +1153,7 @@
         if(result==null){
             synchronized(producerExchanges){
                 result=new ProducerBrokerExchange();
-                ConnectionState state=lookupConnectionState(id);
+                TransportConnectionState state=lookupConnectionState(id);
                 context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
@@ -1186,7 +1184,7 @@
         if(result==null){
             synchronized(consumerExchanges){
                 result=new ConsumerBrokerExchange();
-                ConnectionState state=lookupConnectionState(id);
+                TransportConnectionState state=lookupConnectionState(id);
                 context=state.getContext();
                 result.setConnectionContext(context);
                 SessionState ss=state.getSessionState(id.getParentId());
@@ -1252,5 +1250,72 @@
 	public Response processConsumerControl(ConsumerControl control) throws Exception {
 		return null;
 	}
+
+	///////////////////////////////////////////////////////////////////
+	//
+	// The following methods handle the logical connection state.  It is possible
+	// multiple logical connections multiplexed over a single physical connection.
+	// But have not yet exploited the feature from the clients, so for performance
+	// reasons (to avoid a hash lookup) this class only keeps track of 1 
+	// logical connection state.
+	//
+	// A sub class could override these methods to a full multiple logical connection
+	// support.
+	//
+	///////////////////////////////////////////////////////////////////
+	
+	protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
+		TransportConnectionState rc = connectionState;
+		connectionState = state;
+		return rc;
+	}
+	
+	protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
+		TransportConnectionState rc = connectionState;
+		connectionState = null;
+		return rc;
+	}
+	protected List<TransportConnectionState> listConnectionStates() {
+		ArrayList<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
+		if( connectionState!=null ) {
+			rc.add(connectionState);
+		}
+		return rc;
+	}
+
+	protected TransportConnectionState lookupConnectionState(String connectionId){
+        TransportConnectionState cs=connectionState;
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: "
+                    +connectionId);
+        return cs;
+    }
+    protected TransportConnectionState lookupConnectionState(ConsumerId id){
+        TransportConnectionState cs=connectionState;
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+                    +id.getParentId().getParentId());
+        return cs;
+    }
+    protected TransportConnectionState lookupConnectionState(ProducerId id){
+        TransportConnectionState cs=connectionState;
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+                    +id.getParentId().getParentId());
+        return cs;
+    }
+    protected TransportConnectionState lookupConnectionState(SessionId id){
+        TransportConnectionState cs=connectionState;
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+                    +id.getParentId());
+        return cs;
+    }
+    protected TransportConnectionState lookupConnectionState(ConnectionId connectionId){
+        TransportConnectionState cs=connectionState;
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
+        return cs;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Sun Jul 29 22:34:37 2007
@@ -60,7 +60,7 @@
         registerMBean(byAddressName);
     }
 
-    public synchronized void stop() throws Exception {
+    public void doStop() throws Exception {
         if (isStarting()) {
             setPendingStop(true);
             return;
@@ -71,7 +71,7 @@
 	        byClientIdName=null;
 	        byAddressName=null;
     	}
-        super.stop();
+        super.doStop();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jul 29 22:34:37 2007
@@ -57,6 +57,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -98,7 +99,7 @@
     private final DestinationInterceptor destinationInterceptor;
     private ConnectionContext adminConnectionContext;
     protected DestinationFactory destinationFactory;
-    protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
    
     
         
@@ -605,7 +606,7 @@
         this.adminConnectionContext = adminConnectionContext;
     }
     
-	public Map getConnectionStates() {
+	public Map<ConnectionId, ConnectionState> getConnectionStates() {
 		return connectionStates;
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Sun Jul 29 22:34:37 2007
@@ -24,6 +24,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
@@ -32,12 +34,9 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class ConnectionState {
     
-    final ConnectionInfo info;
+    ConnectionInfo info;
     private final ConcurrentHashMap transactions = new ConcurrentHashMap();
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
@@ -52,6 +51,15 @@
     public String toString() {
         return info.toString();
     }
+    
+	public void reset(ConnectionInfo info) {
+		this.info=info;
+		transactions.clear();
+		sessions.clear();
+		tempDestinations.clear();
+		shutdown.set(false);
+	}
+
 
     public void addTempDestination(DestinationInfo info) {
     	checkShutdown();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Sun Jul 29 22:34:37 2007
@@ -17,16 +17,6 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.net.SocketFactory;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -40,6 +30,19 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
@@ -64,6 +67,7 @@
     protected boolean useLocalHost = true;
     protected int minmumWireFormatVersion;
     protected SocketFactory socketFactory;
+    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
 
     private Map socketOptions;
     private Boolean keepAlive;
@@ -131,24 +135,22 @@
      */
     public void run() {
         log.trace("TCP consumer thread starting");
-        while (!isStopped()) {
-            try {
-                Object command = readCommand();
-                doConsume(command);
-            }
-            catch (SocketTimeoutException e) {
-            }
-            catch (InterruptedIOException e) {
-            }
-            catch (IOException e) {
-                try {
-                    stop();
-                }
-                catch (Exception e2) {
-                    log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
-                }
-                onException(e);
-            }
+        try {
+	        while (!isStopped()) {
+	            try {
+	                Object command = readCommand();
+	                doConsume(command);
+	            }
+	            catch (SocketTimeoutException e) {
+	            }
+	            catch (InterruptedIOException e) {
+	            }
+	        }
+        } catch (IOException e) {
+        	stoppedLatch.get().countDown();
+            onException(e);
+        } finally {
+        	stoppedLatch.get().countDown();
         }
     }
 
@@ -301,6 +303,7 @@
 
     protected void doStart() throws Exception {
         connect();
+        stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
 
@@ -355,6 +358,7 @@
         initializeStreams();
     }
 
+    
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("Stopping transport " + this);
@@ -366,6 +370,19 @@
         if (socket != null) {
             socket.close();
         }
+    }
+    
+    
+    /**
+     * Override so that stop() blocks until the run thread is no longer running.
+     */
+    @Override
+    public void stop() throws Exception {
+    	super.stop();
+    	CountDownLatch countDownLatch = stoppedLatch.get();
+    	if( countDownLatch!=null ) {
+    		countDownLatch.await();
+    	}
     }
 
     protected void initializeStreams() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sun Jul 29 22:34:37 2007
@@ -32,6 +32,7 @@
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -53,10 +54,8 @@
     protected boolean network;
     protected boolean async=true;
     protected int asyncQueueDepth=2000;
-    protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
     protected LinkedBlockingQueue messageQueue=null;
     protected boolean started;
-    protected final Object startMutex = new Object();
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
@@ -85,45 +84,37 @@
         }
         if(peer==null)
             throw new IOException("Peer not connected.");
-        if(!peer.disposed){
-            if(async){
-                asyncOneWay(command);
-            }else{
-                syncOneWay(command);
-            }
-        }else{
-            throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
-        }
-    }
 
-    protected void syncOneWay(Object command){
     	TransportListener tl=null;
-    	synchronized(peer.startMutex){
+    	synchronized(peer.mutex) {
+    		if( peer.disposed ) {
+    			throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
+    		}
         	if( peer.started ) {
-                tl = peer.transportListener;
-        	} else if(!peer.disposed) {
-                peer.prePeerSetQueue.add(command);
+                if(peer.async){
+                    peer.enqueue(command);
+        		    peer.wakeup();
+                } else {
+                    tl = peer.transportListener;                   
+                }
+        	} else {
+        		peer.enqueue(command);
         	}
     	}
+    	
     	if( tl!=null ) {
-            tl.onCommand(command);
-        }
+    		tl.onCommand(command);
+    	}
+        
     }
 
-    protected void asyncOneWay(Object command) throws IOException{
-        try{
-            synchronized(mutex){
-                if(messageQueue==null){
-                    messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
-                }
-            }
-            messageQueue.put(command);
-            wakeup();
-        }catch(final InterruptedException e){
-            log.error("messageQueue interupted",e);
-            throw new IOException(e.getMessage());
-        }
-    }
+	private void enqueue(Object command) throws IOException {
+		try{
+			getMessageQueue().put(command);
+		}catch(final InterruptedException e){
+		    throw IOExceptionSupport.create(e);
+		}
+	}
 
     public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
         throw new AssertionError("Unsupported Method");
@@ -146,32 +137,38 @@
     public void setTransportListener(TransportListener commandListener){
         synchronized(mutex){
             this.transportListener=commandListener;
+            wakeup();
         }
-        wakeup();
-        peer.wakeup();
     }
 
+    private LinkedBlockingQueue getMessageQueue() {
+    	synchronized(mutex) {
+	        if( messageQueue==null ) {
+	            messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+	        }
+	        return messageQueue;
+    	}
+    }
+    
+    
     public void start() throws Exception{
         if(transportListener==null)
             throw new IOException("TransportListener not set.");
-        synchronized(startMutex) {
-	        if( !prePeerSetQueue.isEmpty() ) {
-	            for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
-	                Command command=(Command)iter.next();
-	                transportListener.onCommand(command);
-	            }
-	            prePeerSetQueue.clear();
-	        } 
+        
+        synchronized(mutex) {
+        	if( messageQueue!=null ) {
+	           Object command;
+	           while( (command = messageQueue.poll()) !=null ) {
+	        	   transportListener.onCommand(command);
+	           }
+        	}
 	        started = true;
-	        if( isAsync() ) {
-	            peer.wakeup();
-	            wakeup();
-	        }
+            wakeup();
         }
     }
 
     public void stop() throws Exception{
-    	synchronized(startMutex) {
+    	synchronized(mutex) {
             if(!disposed){
     	        started=false;
                 disposed=true;
@@ -221,18 +218,21 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate(){
-        final TransportListener tl=peer.transportListener;
-        Command command=null;
+        final TransportListener tl;
         synchronized(mutex){
-            if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
-                command=(Command)messageQueue.poll();
-            }
-        }
-        if(tl!=null&&command!=null){
+        	tl = transportListener;
+        	if( !started || disposed || tl==null )
+        		return false;
+        }
+        
+        LinkedBlockingQueue mq = getMessageQueue();
+        final Command command = (Command)mq.poll();                
+        if( command!=null ) {
             tl.onCommand(command);
-        }
-        boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
-        return result;
+            return !mq.isEmpty();
+        } else {
+        	return false;
+    	}        
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java Sun Jul 29 22:34:37 2007
@@ -184,8 +184,9 @@
      * Make sure you cannot publish to a temp destination that does not exist anymore.
      * 
      * @throws JMSException
+     * @throws InterruptedException 
      */
-    public void testPublishFailsForClosedConnection() throws JMSException {
+    public void testPublishFailsForClosedConnection() throws JMSException, InterruptedException {
         
         Connection tempConnection = factory.createConnection();
         Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
@@ -202,6 +203,7 @@
 
         // Closing the connection should destroy the temp queue that was created.
         tempConnection.close();
+        Thread.sleep(1000); // Wait a little bit to let the delete take effect.
         
         // This message delivery NOT should work since the temp connection is now closed.
         try {