You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/06 02:53:18 UTC

svn commit: r440548 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker: AbstractConnection.java ConnectionContext.java region/RegionBroker.java

Author: chirino
Date: Tue Sep  5 17:53:18 2006
New Revision: 440548

URL: http://svn.apache.org/viewvc?view=rev&rev=440548
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-911

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Tue Sep  5 17:53:18 2006
@@ -23,9 +23,11 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.ConnectionStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -88,7 +90,8 @@
     private boolean inServiceException=false;
     private boolean manageable;
 
-    protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+    protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
+	protected final Map brokerConnectionStates;
     
     private WireFormatInfo wireFormatInfo;    
     protected boolean disposed=false;
@@ -117,6 +120,10 @@
         
         this.connector = connector;
         this.broker = broker;
+        
+        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
+        brokerConnectionStates = rb.getConnectionStates();
+        
         if (connector != null) {
             this.statistics.setParent(connector.getStatistics());
         }
@@ -153,7 +160,7 @@
         // Remove all logical connection associated with this connection
         // from the broker.
         if(!broker.isStopped()){
-            ArrayList l=new ArrayList(connectionStates.keySet());
+            ArrayList l=new ArrayList(localConnectionStates.keySet());
             for(Iterator iter=l.iterator();iter.hasNext();){
                 ConnectionId connectionId=(ConnectionId) iter.next();
                 try{
@@ -245,25 +252,25 @@
     }
     
     protected ConnectionState lookupConnectionState(ConsumerId id) {
-        ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
         if( cs== null )
             throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
         return cs;
     }
     protected ConnectionState lookupConnectionState(ProducerId id) {
-        ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
         if( cs== null )
             throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());        
         return cs;
     }
     protected ConnectionState lookupConnectionState(SessionId id) {
-        ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
         if( cs== null )
             throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());        
         return cs;
     }
     protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
-        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
         if( cs== null )
             throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
         return cs;
@@ -293,7 +300,7 @@
     }
 
     public Response processBeginTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -310,7 +317,7 @@
     }
     
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -321,7 +328,7 @@
     }
 
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -331,7 +338,7 @@
     }
 
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -341,7 +348,7 @@
     }
 
     public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -351,7 +358,7 @@
     }
     
     public Response processForgetTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -361,7 +368,7 @@
     }
     
     public Response processRecoverTransactions(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
         if( cs!=null ) {
            context = cs.getContext();
@@ -428,12 +435,16 @@
         SessionState ss = cs.getSessionState(sessionId);
         if( ss == null )
             throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
-        broker.addProducer(cs.getContext(), info);
-        try {
-            ss.addProducer(info);
-		} catch (IllegalStateException e) {
-			broker.removeProducer(cs.getContext(), info);
-		}
+
+        // Avoid replaying dup commands
+        if( !ss.getProducerIds().contains(info.getProducerId()) ) {
+	        broker.addProducer(cs.getContext(), info);
+	        try {
+	            ss.addProducer(info);
+			} catch (IllegalStateException e) {
+				broker.removeProducer(cs.getContext(), info);
+			}
+        }
         return null;
     }
     
@@ -462,12 +473,15 @@
         if( ss == null )
             throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
 
-        broker.addConsumer(cs.getContext(), info);
-        try {
-			ss.addConsumer(info);
-		} catch (IllegalStateException e) {
-			broker.removeConsumer(cs.getContext(), info);
-		}
+        // Avoid replaying dup commands
+        if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
+	        broker.addConsumer(cs.getContext(), info);
+	        try {
+				ss.addConsumer(info);
+			} catch (IllegalStateException e) {
+				broker.removeConsumer(cs.getContext(), info);
+			}
+        }
         
         return null;
     }
@@ -491,14 +505,17 @@
     
     public Response processAddSession(SessionInfo info) throws Exception {
         ConnectionId connectionId = info.getSessionId().getParentId();
-        
         ConnectionState cs = lookupConnectionState(connectionId);
-    	broker.addSession(cs.getContext(), info);
-        try {
-            cs.addSession(info);
-		} catch (IllegalStateException e) {
-			broker.removeSession(cs.getContext(), info);
-		}
+        
+        // Avoid replaying dup commands
+        if( !cs.getSessionIds().contains(info.getSessionId()) ) {
+	    	broker.addSession(cs.getContext(), info);
+	        try {
+	            cs.addSession(info);
+			} catch (IllegalStateException e) {
+				broker.removeSession(cs.getContext(), info);
+			}
+        }
         return null;
     }
     
@@ -540,28 +557,40 @@
     }
     
     public Response processAddConnection(ConnectionInfo info) throws Exception {
-        // Setup the context.
-        String clientId = info.getClientId();
-        ConnectionContext context = new ConnectionContext();
-        context.setConnection(this);
-        context.setBroker(broker);
-        context.setConnector(connector);
-        context.setTransactions(new ConcurrentHashMap());
-        context.setClientId(clientId);
-        context.setUserName(info.getUserName());
-        context.setConnectionId(info.getConnectionId());
-        context.setWireFormatInfo(wireFormatInfo);
-        this.manageable = info.isManageable();
-        connectionStates.put(info.getConnectionId(), new ConnectionState(info, context));
-       
-        
-        broker.addConnection(context, info);
-        if (info.isManageable() && broker.isFaultTolerantConfiguration()){
-            //send ConnectionCommand
-            ConnectionControl command = new ConnectionControl();
-            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
-            dispatchAsync(command);
-        }
+    	
+    	ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
+    	if( state == null ) {
+    		
+	        // Setup the context.
+	        String clientId = info.getClientId();
+	        ConnectionContext context = new ConnectionContext();
+	        context.setConnection(this);
+	        context.setBroker(broker);
+	        context.setConnector(connector);
+	        context.setTransactions(new ConcurrentHashMap());
+	        context.setClientId(clientId);
+	        context.setUserName(info.getUserName());
+	        context.setConnectionId(info.getConnectionId());
+	        context.setWireFormatInfo(wireFormatInfo);
+    		context.incrementReference();
+	        this.manageable = info.isManageable();
+	        
+	        state = new ConnectionState(info, context);
+	        brokerConnectionStates.put(info.getConnectionId(), state);
+	        localConnectionStates.put(info.getConnectionId(), state);	        
+	        
+	        broker.addConnection(context, info);
+	        if (info.isManageable() && broker.isFaultTolerantConfiguration()){
+	            //send ConnectionCommand
+	            ConnectionControl command = new ConnectionControl();
+	            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
+	            dispatchAsync(command);
+	        }
+    	} else {
+    		// We are a concurrent connection... it must be client reconnect.
+    		localConnectionStates.put(info.getConnectionId(), state);
+    		state.getContext().incrementReference();
+    	}
         return null;
     }
     
@@ -599,8 +628,14 @@
         }catch(Throwable e){
             serviceLog.warn("Failed to remove connection " +  cs.getInfo(),e);
         }
-        connectionStates.remove(id);
-        
+        ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
+        if( state != null ) {
+        	// If we are the last reference, we should remove the state
+        	// from the broker.
+        	if( state.getContext().decrementReference() == 0 ){ 
+        		brokerConnectionStates.remove(id);
+        	}
+        }
         return null;
     }
 

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Tue Sep  5 17:53:18 2006
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.ConnectionId;
@@ -51,6 +52,7 @@
     private Object longTermStoreContext;
     private boolean producerFlowControl=true;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
+    private AtomicInteger referenceCounter = new AtomicInteger();
     
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
     
@@ -220,5 +222,13 @@
         }
         return true;
     }
+
+	public int incrementReference() {
+		return referenceCounter.incrementAndGet();
+	}
+	
+	public int decrementReference() {
+		return referenceCounter.decrementAndGet();
+	}
 
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Sep  5 17:53:18 2006
@@ -17,8 +17,15 @@
  */
 package org.apache.activemq.broker.region;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
@@ -47,15 +54,9 @@
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceStopper;
 
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 
 /**
  * Routes Broker operations to the correct messaging regions for processing.
@@ -85,6 +86,8 @@
     private String brokerName;
     private Map clientIdSet = new HashMap(); // we will synchronize access
     protected  PersistenceAdapter adaptor;
+    
+    protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
 
         
     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
@@ -514,7 +517,9 @@
         this.keepDurableSubsActive = keepDurableSubsActive;
     }
 
-    
+	public Map getConnectionStates() {
+		return connectionStates;
+	}
 
 
 }