You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/06 02:53:18 UTC
svn commit: r440548 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker:
AbstractConnection.java ConnectionContext.java region/RegionBroker.java
Author: chirino
Date: Tue Sep 5 17:53:18 2006
New Revision: 440548
URL: http://svn.apache.org/viewvc?view=rev&rev=440548
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-911
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Tue Sep 5 17:53:18 2006
@@ -23,9 +23,11 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.ConnectionStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -88,7 +90,8 @@
private boolean inServiceException=false;
private boolean manageable;
- protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+ protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
+ protected final Map brokerConnectionStates;
private WireFormatInfo wireFormatInfo;
protected boolean disposed=false;
@@ -117,6 +120,10 @@
this.connector = connector;
this.broker = broker;
+
+ RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
+ brokerConnectionStates = rb.getConnectionStates();
+
if (connector != null) {
this.statistics.setParent(connector.getStatistics());
}
@@ -153,7 +160,7 @@
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
- ArrayList l=new ArrayList(connectionStates.keySet());
+ ArrayList l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId) iter.next();
try{
@@ -245,25 +252,25 @@
}
protected ConnectionState lookupConnectionState(ConsumerId id) {
- ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ProducerId id) {
- ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId().getParentId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(SessionId id) {
- ConnectionState cs = (ConnectionState) connectionStates.get(id.getParentId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
if( cs== null )
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());
return cs;
}
protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
if( cs== null )
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
return cs;
@@ -293,7 +300,7 @@
}
public Response processBeginTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -310,7 +317,7 @@
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -321,7 +328,7 @@
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -331,7 +338,7 @@
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -341,7 +348,7 @@
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -351,7 +358,7 @@
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -361,7 +368,7 @@
}
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
+ ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
context = cs.getContext();
@@ -428,12 +435,16 @@
SessionState ss = cs.getSessionState(sessionId);
if( ss == null )
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
- broker.addProducer(cs.getContext(), info);
- try {
- ss.addProducer(info);
- } catch (IllegalStateException e) {
- broker.removeProducer(cs.getContext(), info);
- }
+
+ // Avoid replaying dup commands
+ if( !ss.getProducerIds().contains(info.getProducerId()) ) {
+ broker.addProducer(cs.getContext(), info);
+ try {
+ ss.addProducer(info);
+ } catch (IllegalStateException e) {
+ broker.removeProducer(cs.getContext(), info);
+ }
+ }
return null;
}
@@ -462,12 +473,15 @@
if( ss == null )
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
- broker.addConsumer(cs.getContext(), info);
- try {
- ss.addConsumer(info);
- } catch (IllegalStateException e) {
- broker.removeConsumer(cs.getContext(), info);
- }
+ // Avoid replaying dup commands
+ if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
+ broker.addConsumer(cs.getContext(), info);
+ try {
+ ss.addConsumer(info);
+ } catch (IllegalStateException e) {
+ broker.removeConsumer(cs.getContext(), info);
+ }
+ }
return null;
}
@@ -491,14 +505,17 @@
public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
-
ConnectionState cs = lookupConnectionState(connectionId);
- broker.addSession(cs.getContext(), info);
- try {
- cs.addSession(info);
- } catch (IllegalStateException e) {
- broker.removeSession(cs.getContext(), info);
- }
+
+ // Avoid replaying dup commands
+ if( !cs.getSessionIds().contains(info.getSessionId()) ) {
+ broker.addSession(cs.getContext(), info);
+ try {
+ cs.addSession(info);
+ } catch (IllegalStateException e) {
+ broker.removeSession(cs.getContext(), info);
+ }
+ }
return null;
}
@@ -540,28 +557,40 @@
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
- // Setup the context.
- String clientId = info.getClientId();
- ConnectionContext context = new ConnectionContext();
- context.setConnection(this);
- context.setBroker(broker);
- context.setConnector(connector);
- context.setTransactions(new ConcurrentHashMap());
- context.setClientId(clientId);
- context.setUserName(info.getUserName());
- context.setConnectionId(info.getConnectionId());
- context.setWireFormatInfo(wireFormatInfo);
- this.manageable = info.isManageable();
- connectionStates.put(info.getConnectionId(), new ConnectionState(info, context));
-
-
- broker.addConnection(context, info);
- if (info.isManageable() && broker.isFaultTolerantConfiguration()){
- //send ConnectionCommand
- ConnectionControl command = new ConnectionControl();
- command.setFaultTolerant(broker.isFaultTolerantConfiguration());
- dispatchAsync(command);
- }
+
+ ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
+ if( state == null ) {
+
+ // Setup the context.
+ String clientId = info.getClientId();
+ ConnectionContext context = new ConnectionContext();
+ context.setConnection(this);
+ context.setBroker(broker);
+ context.setConnector(connector);
+ context.setTransactions(new ConcurrentHashMap());
+ context.setClientId(clientId);
+ context.setUserName(info.getUserName());
+ context.setConnectionId(info.getConnectionId());
+ context.setWireFormatInfo(wireFormatInfo);
+ context.incrementReference();
+ this.manageable = info.isManageable();
+
+ state = new ConnectionState(info, context);
+ brokerConnectionStates.put(info.getConnectionId(), state);
+ localConnectionStates.put(info.getConnectionId(), state);
+
+ broker.addConnection(context, info);
+ if (info.isManageable() && broker.isFaultTolerantConfiguration()){
+ //send ConnectionCommand
+ ConnectionControl command = new ConnectionControl();
+ command.setFaultTolerant(broker.isFaultTolerantConfiguration());
+ dispatchAsync(command);
+ }
+ } else {
+ // We are a concurrent connection... it must be client reconnect.
+ localConnectionStates.put(info.getConnectionId(), state);
+ state.getContext().incrementReference();
+ }
return null;
}
@@ -599,8 +628,14 @@
}catch(Throwable e){
serviceLog.warn("Failed to remove connection " + cs.getInfo(),e);
}
- connectionStates.remove(id);
-
+ ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
+ if( state != null ) {
+ // If we are the last reference, we should remove the state
+ // from the broker.
+ if( state.getContext().decrementReference() == 0 ){
+ brokerConnectionStates.remove(id);
+ }
+ }
return null;
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Tue Sep 5 17:53:18 2006
@@ -18,6 +18,7 @@
package org.apache.activemq.broker;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ConnectionId;
@@ -51,6 +52,7 @@
private Object longTermStoreContext;
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
+ private AtomicInteger referenceCounter = new AtomicInteger();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@@ -220,5 +222,13 @@
}
return true;
}
+
+ public int incrementReference() {
+ return referenceCounter.incrementAndGet();
+ }
+
+ public int decrementReference() {
+ return referenceCounter.decrementAndGet();
+ }
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=440548&r1=440547&r2=440548
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Sep 5 17:53:18 2006
@@ -17,8 +17,15 @@
*/
package org.apache.activemq.broker.region;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
@@ -47,15 +54,9 @@
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
/**
* Routes Broker operations to the correct messaging regions for processing.
@@ -85,6 +86,8 @@
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
protected PersistenceAdapter adaptor;
+
+ protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
@@ -514,7 +517,9 @@
this.keepDurableSubsActive = keepDurableSubsActive;
}
-
+ public Map getConnectionStates() {
+ return connectionStates;
+ }
}