You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/02 15:43:20 UTC

svn commit: r418592 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/AbstractConnection.java state/ConnectionState.java state/SessionState.java

Author: chirino
Date: Sun Jul  2 06:43:19 2006
New Revision: 418592

URL: http://svn.apache.org/viewvc?rev=418592&view=rev
Log:
Fixing http://issues.apache.org/activemq/browse/AMQ-724, async exception could close a connection while a new consumer is being added which resulted in the consumer not being removed from the
broker when the connction was shut down.

Danielius Jurna, thanks for the great bug report and problem determination!


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=418592&r1=418591&r2=418592&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Sun Jul  2 06:43:19 2006
@@ -421,7 +421,11 @@
         if( ss == null )
             throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
         broker.addProducer(cs.getContext(), info);
-        ss.addProducer(info);
+        try {
+            ss.addProducer(info);
+		} catch (IllegalStateException e) {
+			broker.removeProducer(cs.getContext(), info);
+		}
         return null;
     }
     
@@ -451,7 +455,12 @@
             throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
 
         broker.addConsumer(cs.getContext(), info);
-        ss.addConsumer(info);
+        try {
+			ss.addConsumer(info);
+		} catch (IllegalStateException e) {
+			broker.removeConsumer(cs.getContext(), info);
+		}
+        
         return null;
     }
     
@@ -476,8 +485,12 @@
         ConnectionId connectionId = info.getSessionId().getParentId();
         
         ConnectionState cs = lookupConnectionState(connectionId);
-        broker.addSession(cs.getContext(), info);
-        cs.addSession(info);
+    	broker.addSession(cs.getContext(), info);
+        try {
+            cs.addSession(info);
+		} catch (IllegalStateException e) {
+			broker.removeSession(cs.getContext(), info);
+		}
         return null;
     }
     
@@ -487,6 +500,10 @@
         
         ConnectionState cs = lookupConnectionState(connectionId);
         SessionState session = cs.getSessionState(id);
+        
+        // Don't let new consumers or producers get added while we are closing this down.
+        session.shutdown();
+        
         if( session == null )
             throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
         
@@ -543,6 +560,9 @@
     public Response processRemoveConnection(ConnectionId id)  {
         
         ConnectionState cs = lookupConnectionState(id);
+        
+        // Don't allow things to be added to the connection state while we are shutting down.
+        cs.shutdown();
         
         // Cascade the connection stop to the sessions.
         for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?rev=418592&r1=418591&r2=418592&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Sun Jul  2 06:43:19 2006
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
@@ -37,6 +38,7 @@
     final ConnectionInfo info;
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
     
     public ConnectionState(ConnectionInfo info) {
         this.info = info;
@@ -49,10 +51,11 @@
     }
 
     public void addTempDestination(DestinationInfo info) {
+    	checkShutdown();
         tempDestinations.add(info);
     }
 
-    public void removeTempDestination(ActiveMQDestination destination) {
+	public void removeTempDestination(ActiveMQDestination destination) {
         for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
             DestinationInfo di = (DestinationInfo) iter.next();
             if( di.getDestination().equals(destination) ) {
@@ -62,6 +65,7 @@
     }
 
     public void addSession(SessionInfo info) {
+    	checkShutdown();
         sessions.put(info.getSessionId(), new SessionState(info));            
     }        
     public SessionState removeSession(SessionId id) {
@@ -85,5 +89,19 @@
 
     public Collection getSessionStates() {
         return sessions.values();
-    }        
+    }
+    
+    private void checkShutdown() {
+		if( shutdown.get() )
+			throw new IllegalStateException("Disposed");
+	}
+    
+    public void shutdown() {
+    	if( shutdown.compareAndSet(false, true) ) {
+    		for (Iterator iter = sessions.values().iterator(); iter.hasNext();) {
+				SessionState ss = (SessionState) iter.next();
+				ss.shutdown();
+			}
+    	}
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=418592&r1=418591&r2=418592&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Sun Jul  2 06:43:19 2006
@@ -19,6 +19,7 @@
 
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -33,6 +34,7 @@
     
     public final ConcurrentHashMap producers = new ConcurrentHashMap();
     public final ConcurrentHashMap consumers = new ConcurrentHashMap();
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
     
     public SessionState(SessionInfo info) {
         this.info = info;
@@ -42,6 +44,7 @@
     }
     
     public void addProducer(ProducerInfo info) {
+    	checkShutdown();
         producers.put(info.getProducerId(), new ProducerState(info));            
     }        
     public ProducerState removeProducer(ProducerId id) {
@@ -49,6 +52,7 @@
     }
     
     public void addConsumer(ConsumerInfo info) {
+    	checkShutdown();
         consumers.put(info.getConsumerId(), new ConsumerState(info));            
     }        
     public ConsumerState removeConsumer(ConsumerId id) {
@@ -72,5 +76,15 @@
     
     public Collection getConsumerStates() {
         return consumers.values();
-    }        
+    }
+    
+    private void checkShutdown() {
+		if( shutdown.get() )
+			throw new IllegalStateException("Disposed");
+	}
+    
+    public void shutdown() {
+    	shutdown.set(false);
+    }
+
 }