You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/14 13:44:17 UTC

svn commit: r474769 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Author: chirino
Date: Tue Nov 14 04:44:16 2006
New Revision: 474769

URL: http://svn.apache.org/viewvc?view=rev&rev=474769
Log:
Added synconization to the methods that setup connection state so that when an async error is detected, it properly does a full cleanup.  Previously subscriptions were not properly cleaned up since they were being setup at the same time as they were being cleaned up.

Also start 1 async exception thread ever since an async exception leads to connection tear down.  Subsequent failures do not need additional async threads started.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=474769&r1=474768&r2=474769
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 14 04:44:16 2006
@@ -123,6 +123,7 @@
     private AtomicBoolean stopped = new AtomicBoolean(false);
     protected final AtomicBoolean disposed=new AtomicBoolean(false);
     private CountDownLatch stopLatch = new CountDownLatch(1);
+    protected final AtomicBoolean asyncException = new AtomicBoolean(false);
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState {
         private final ConnectionContext context;
@@ -217,11 +218,13 @@
      * @param e
      */
 	public void serviceExceptionAsync(final IOException e) {
-		new Thread("Async Exception Handler") {
-			public void run() {
-				serviceException(e);
-			}
-		}.start();
+		if( asyncException.compareAndSet(false, true) ) {
+			new Thread("Async Exception Handler") {
+				public void run() {
+					serviceException(e);
+				}
+			}.start();
+		}
 	}
 
 	/**
@@ -349,7 +352,7 @@
         return null;
     }
 
-    public Response processBeginTransaction(TransactionInfo info) throws Exception {
+    synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -364,14 +367,14 @@
         return null;
     }
     
-    public Response processEndTransaction(TransactionInfo info) throws Exception {
+    synchronized public Response processEndTransaction(TransactionInfo info) throws Exception {
         // No need to do anything.  This packet is just sent by the client
         // make sure he is synced with the server as commit command could
         // come from a different connection.
         return null;
     }
     
-    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+    synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -395,7 +398,7 @@
         }
     }
 
-    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+    synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -409,7 +412,7 @@
         
     }
 
-    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+    synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -421,7 +424,7 @@
         return null;
     }
 
-    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+    synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -433,7 +436,7 @@
         return null;
     }
     
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+    synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -443,7 +446,7 @@
         return null;
     }
     
-    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+    synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception {
         ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
@@ -498,7 +501,7 @@
         return null;
     }
 
-    public Response processAddDestination(DestinationInfo info) throws Exception {
+    synchronized public Response processAddDestination(DestinationInfo info) throws Exception {
         ConnectionState cs = lookupConnectionState(info.getConnectionId());
         broker.addDestinationInfo(cs.getContext(), info);
         if( info.getDestination().isTemporary() ) {
@@ -507,7 +510,7 @@
         return null;
     }
 
-    public Response processRemoveDestination(DestinationInfo info) throws Exception {
+    synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception {
         ConnectionState cs = lookupConnectionState(info.getConnectionId());
         broker.removeDestinationInfo(cs.getContext(), info);
         if( info.getDestination().isTemporary() ) {
@@ -517,7 +520,7 @@
     }
 
 
-    public Response processAddProducer(ProducerInfo info) throws Exception {
+    synchronized public Response processAddProducer(ProducerInfo info) throws Exception {
         SessionId sessionId = info.getProducerId().getParentId();
         ConnectionId connectionId = sessionId.getParentId();
         
@@ -538,7 +541,7 @@
         return null;
     }
     
-    public Response processRemoveProducer(ProducerId id) throws Exception {
+    synchronized public Response processRemoveProducer(ProducerId id) throws Exception {
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
         
@@ -554,7 +557,7 @@
         return null;
     }
 
-    public Response processAddConsumer(ConsumerInfo info) throws Exception {
+    synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception {
         SessionId sessionId = info.getConsumerId().getParentId();
         ConnectionId connectionId = sessionId.getParentId();
         
@@ -576,7 +579,7 @@
         return null;
     }
     
-    public Response processRemoveConsumer(ConsumerId id) throws Exception {
+    synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception {
         
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
@@ -593,7 +596,7 @@
         return null;
     }
     
-    public Response processAddSession(SessionInfo info) throws Exception {
+    synchronized public Response processAddSession(SessionInfo info) throws Exception {
         ConnectionId connectionId = info.getSessionId().getParentId();
         ConnectionState cs = lookupConnectionState(connectionId);
         
@@ -609,7 +612,7 @@
         return null;
     }
     
-    public Response processRemoveSession(SessionId id) throws Exception {
+    synchronized public Response processRemoveSession(SessionId id) throws Exception {
         
         ConnectionId connectionId = id.getParentId();
         
@@ -646,7 +649,7 @@
         return null;
     }
     
-    public Response processAddConnection(ConnectionInfo info) throws Exception {
+    synchronized public Response processAddConnection(ConnectionInfo info) throws Exception {
 
     	ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
     	
@@ -695,7 +698,7 @@
         return null;
     }
     
-    public Response processRemoveConnection(ConnectionId id)  {
+    synchronized public Response processRemoveConnection(ConnectionId id)  {
         
         ConnectionState cs = lookupConnectionState(id);