You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/30 07:34:39 UTC
svn commit: r560872 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/state/ main/java/org/apache/activemq/...
Author: chirino
Date: Sun Jul 29 22:34:37 2007
New Revision: 560872
URL: http://svn.apache.org/viewvc?view=rev&rev=560872
Log:
Serveral changes needed to Fix https://issues.apache.org/activemq/browse/AMQ-1349
- The vm:// transport was delivering events to the listener before start() was called. Also clean it up a little by consolidating
the use of the prePeerSetQueue and messageQueue field.
- the tcp:// .stop() method now blocks until the thread that calls out to the listener is shutdown.
- TransportConnection was not doing a good job synchronizing when multiple concurrent conenctions to the same connection Id was established.
IllegalStateExceptions were common when a failover connection reconnected. Now we make sure that only 1 connection with a given connectionId
is activley operating in the broker. Also removed 1 un-needed hash lookup by replacing the brokerConnectionStates Map with the
connectionState variable.
Also added a pause in the JmsTempDestinationTest to avoid intermitent failures.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Sun Jul 29 22:34:37 2007
@@ -54,7 +54,6 @@
private Object longTermStoreContext;
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
- private AtomicInteger referenceCounter = new AtomicInteger();
private boolean networkConnection;
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@@ -241,15 +240,6 @@
}
return true;
}
-
- public int incrementReference() {
- return referenceCounter.incrementAndGet();
- }
-
- public int decrementReference() {
- return referenceCounter.decrementAndGet();
- }
-
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sun Jul 29 22:34:37 2007
@@ -72,6 +72,7 @@
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
@@ -105,8 +106,8 @@
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
// Keeps track of the state of the connections.
- protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
- protected final Map brokerConnectionStates;
+// protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
+ protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
private WireFormatInfo wireFormatInfo;
@@ -140,16 +141,18 @@
private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge = null;
final private TaskRunnerFactory taskRunnerFactory;
+ private TransportConnectionState connectionState;
- static class ConnectionState extends org.apache.activemq.state.ConnectionState{
+ static class TransportConnectionState extends org.apache.activemq.state.ConnectionState{
- private final ConnectionContext context;
- TransportConnection connection;
+ private ConnectionContext context;
+ private TransportConnection connection;
+ private final Object connectMutex = new Object();
+ private AtomicInteger referenceCounter = new AtomicInteger();
- public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
+ public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection){
super(info);
- this.context=context;
- this.connection=connection;
+ connection=transportConnection;
}
public ConnectionContext getContext(){
@@ -159,6 +162,23 @@
public TransportConnection getConnection(){
return connection;
}
+
+ public void setContext(ConnectionContext context) {
+ this.context = context;
+ }
+
+ public void setConnection(TransportConnection connection) {
+ this.connection = connection;
+ }
+
+ public int incrementReference() {
+ return referenceCounter.incrementAndGet();
+ }
+
+ public int decrementReference() {
+ return referenceCounter.decrementAndGet();
+ }
+
}
/**
@@ -307,36 +327,6 @@
return response;
}
- protected ConnectionState lookupConnectionState(ConsumerId id){
- ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
- if(cs==null)
- throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
- +id.getParentId().getParentId());
- return cs;
- }
-
- protected ConnectionState lookupConnectionState(ProducerId id){
- ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
- if(cs==null)
- throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
- +id.getParentId().getParentId());
- return cs;
- }
-
- protected ConnectionState lookupConnectionState(SessionId id){
- ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
- if(cs==null)
- throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
- +id.getParentId());
- return cs;
- }
-
- protected ConnectionState lookupConnectionState(ConnectionId connectionId){
- ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
- if(cs==null)
- throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
- return cs;
- }
public Response processKeepAlive(KeepAliveInfo info) throws Exception{
return null;
@@ -354,7 +344,15 @@
}
public Response processShutdown(ShutdownInfo info) throws Exception{
- stop();
+ new Thread("Async Exception Handler"){
+ public void run(){
+ try {
+ TransportConnection.this.stop();
+ } catch (Exception e) {
+ serviceException(e);
+ }
+ }
+ }.start();
return null;
}
@@ -363,7 +361,7 @@
}
synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
@@ -387,7 +385,7 @@
}
synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
context=null;
if(cs!=null){
context=cs.getContext();
@@ -413,63 +411,39 @@
}
synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- context=null;
- if(cs!=null){
- context=cs.getContext();
- }
- if (cs == null) {
- throw new NullPointerException("Context is null");
- }
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+ context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),true);
return null;
}
synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- context=null;
- if(cs!=null){
- context=cs.getContext();
- }
- if (cs == null) {
- throw new NullPointerException("Context is null");
- }
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+ context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),false);
return null;
}
synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- context=null;
- if(cs!=null){
- context=cs.getContext();
- }
- if (cs == null) {
- throw new NullPointerException("Context is null");
- }
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+ context=cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context,info.getTransactionId());
return null;
}
synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- context=null;
- if(cs!=null){
- context=cs.getContext();
- }
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+ context=cs.getContext();
broker.forgetTransaction(context,info.getTransactionId());
return null;
}
synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
- ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
- context=null;
- if(cs!=null){
- context=cs.getContext();
- }
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
+ context=cs.getContext();
TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
return new DataArrayResponse(preparedTransactions);
}
@@ -497,7 +471,7 @@
}
synchronized public Response processAddDestination(DestinationInfo info) throws Exception{
- ConnectionState cs=lookupConnectionState(info.getConnectionId());
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(),info);
if(info.getDestination().isTemporary()){
cs.addTempDestination(info);
@@ -506,7 +480,7 @@
}
synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{
- ConnectionState cs=lookupConnectionState(info.getConnectionId());
+ TransportConnectionState cs=lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(),info);
if(info.getDestination().isTemporary()){
cs.removeTempDestination(info.getDestination());
@@ -517,7 +491,7 @@
synchronized public Response processAddProducer(ProducerInfo info) throws Exception{
SessionId sessionId=info.getProducerId().getParentId();
ConnectionId connectionId=sessionId.getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
@@ -537,7 +511,7 @@
synchronized public Response processRemoveProducer(ProducerId id) throws Exception{
SessionId sessionId=id.getParentId();
ConnectionId connectionId=sessionId.getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
@@ -553,7 +527,7 @@
synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{
SessionId sessionId=info.getConsumerId().getParentId();
ConnectionId connectionId=sessionId.getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "
@@ -573,7 +547,7 @@
synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{
SessionId sessionId=id.getParentId();
ConnectionId connectionId=sessionId.getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState ss=cs.getSessionState(sessionId);
if(ss==null)
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
@@ -588,7 +562,7 @@
synchronized public Response processAddSession(SessionInfo info) throws Exception{
ConnectionId connectionId=info.getSessionId().getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
// Avoid replaying dup commands
if(!cs.getSessionIds().contains(info.getSessionId())){
broker.addSession(cs.getContext(),info);
@@ -603,7 +577,7 @@
synchronized public Response processRemoveSession(SessionId id) throws Exception{
ConnectionId connectionId=id.getParentId();
- ConnectionState cs=lookupConnectionState(connectionId);
+ TransportConnectionState cs=lookupConnectionState(connectionId);
SessionState session=cs.getSessionState(id);
if(session==null)
throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
@@ -631,21 +605,36 @@
return null;
}
- synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{
- ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
- if(state!=null){
- // ConnectionInfo replay?? Chances are that it's a client reconnecting,
- // and we have not detected that that old connection died.. Kill the old connection
- // to make sure our state is in sync with the client.
- if(this!=state.getConnection()){
- log.debug("Killing previous stale connection: "+state.getConnection());
- state.getConnection().stop();
- if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
- throw new Exception("Previous connection could not be clean up.");
- }
- }
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+
+ TransportConnectionState state;
+
+ // Make sure 2 concurrent connections by the same ID only generate 1 TransportConnectionState object.
+ synchronized(brokerConnectionStates) {
+ state=(TransportConnectionState)brokerConnectionStates.get(info.getConnectionId());
+ if( state==null ) {
+ state=new TransportConnectionState(info,this);
+ brokerConnectionStates.put(info.getConnectionId(),state);
+ }
+ state.incrementReference();
}
- log.debug("Setting up new connection: "+this);
+
+
+ // If there are 2 concurrent connections for the same connection id, then last one in wins, we need to sync here
+ // to figure out the winner.
+ synchronized(state.connectMutex) {
+ if( state.getConnection()!=this ) {
+ log.debug("Killing previous stale connection: "+state.getConnection().getRemoteAddress());
+ state.getConnection().stop();
+ log.debug("Connection "+getRemoteAddress()+" taking over previous connection: "+state.getConnection().getRemoteAddress());
+ state.setConnection(this);
+ state.reset(info);
+ }
+ }
+
+ registerConnectionState(info.getConnectionId(),state);
+
+ log.debug("Setting up new connection: "+getRemoteAddress());
// Setup the context.
String clientId=info.getClientId();
context=new ConnectionContext();
@@ -659,11 +648,10 @@
context.setClientMaster(info.isClientMaster());
context.setWireFormatInfo(wireFormatInfo);
context.setNetworkConnection(networkConnection);
- context.incrementReference();
this.manageable=info.isManageable();
- state=new ConnectionState(info,context,this);
- brokerConnectionStates.put(info.getConnectionId(),state);
- localConnectionStates.put(info.getConnectionId(),state);
+ state.setContext(context);
+ state.setConnection(this);
+
broker.addConnection(context,info);
if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
// send ConnectionCommand
@@ -674,8 +662,9 @@
return null;
}
- synchronized public Response processRemoveConnection(ConnectionId id){
- ConnectionState cs=lookupConnectionState(id);
+
+ synchronized public Response processRemoveConnection(ConnectionId id){
+ TransportConnectionState cs=lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
@@ -702,12 +691,15 @@
}catch(Throwable e){
serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
}
- ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
- if(state!=null){
- // If we are the last reference, we should remove the state
- // from the broker.
- if(state.getContext().decrementReference()==0){
- brokerConnectionStates.remove(id);
+
+ TransportConnectionState state=unregisterConnectionState(id);
+ if(state!=null) {
+ synchronized(brokerConnectionStates) {
+ // If we are the last reference, we should remove the state
+ // from the broker.
+ if(state.decrementReference()==0){
+ brokerConnectionStates.remove(id);
+ }
}
}
return null;
@@ -869,97 +861,103 @@
return;
}
}
- if(stopped.compareAndSet(false,true)){
- log.debug("Stopping connection: "+transport.getRemoteAddress());
- connector.onStopped(this);
- try{
- synchronized(this){
- if(masterBroker!=null){
- masterBroker.stop();
- }
- if(duplexBridge!=null){
- duplexBridge.stop();
- }
- // If the transport has not failed yet,
- // notify the peer that we are doing a normal shutdown.
- if(transportException==null){
- transport.oneway(new ShutdownInfo());
- }
- }
-
- }catch(Exception ignore){
- log.trace("Exception caught stopping",ignore);
- }
- transport.stop();
- active=false;
- if(disposed.compareAndSet(false,true)){
-
- // Let all the connection contexts know we are shutting down
- // so that in progress operations can notice and unblock.
- ArrayList l=new ArrayList(localConnectionStates.values());
- for(Iterator iter=l.iterator();iter.hasNext();){
- ConnectionState cs=(ConnectionState) iter.next();
- cs.getContext().getStopping().set(true);
- }
-
- if( taskRunner!=null ) {
- taskRunner.wakeup();
- // Give it a change to stop gracefully.
- dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
- disposeTransport();
- taskRunner.shutdown();
- } else {
- disposeTransport();
- }
-
- if( taskRunner!=null )
- taskRunner.shutdown();
-
- // Run the MessageDispatch callbacks so that message references get cleaned up.
- for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
- Command command = (Command) iter.next();
- if(command.isMessageDispatch()) {
- MessageDispatch md=(MessageDispatch) command;
- Runnable sub=md.getTransmitCallback();
- broker.processDispatch(md);
- if(sub!=null){
- sub.run();
- }
- }
- }
- //
- // Remove all logical connection associated with this connection
- // from the broker.
- if(!broker.isStopped()){
- l=new ArrayList(localConnectionStates.keySet());
- for(Iterator iter=l.iterator();iter.hasNext();){
- ConnectionId connectionId=(ConnectionId)iter.next();
- try{
- log.debug("Cleaning up connection resources.");
- processRemoveConnection(connectionId);
- }catch(Throwable ignore){
- ignore.printStackTrace();
- }
- }
- if(brokerInfo!=null){
- broker.removeBroker(this,brokerInfo);
- }
- }
- stopLatch.countDown();
- }
+ if(stopped.compareAndSet(false,true)) {
+ doStop();
+ stopLatch.countDown();
+ } else {
+ stopLatch.await();
}
}
+ protected void doStop() throws Exception, InterruptedException {
+ log.debug("Stopping connection: "+transport.getRemoteAddress());
+ connector.onStopped(this);
+ try{
+ synchronized(this){
+ if(masterBroker!=null){
+ masterBroker.stop();
+ }
+ if(duplexBridge!=null){
+ duplexBridge.stop();
+ }
+ // If the transport has not failed yet,
+ // notify the peer that we are doing a normal shutdown.
+ if(transportException==null){
+ transport.oneway(new ShutdownInfo());
+ }
+ }
+
+ }catch(Exception ignore){
+ log.trace("Exception caught stopping",ignore);
+ }
+ if(disposed.compareAndSet(false,true)){
+
+ // Let all the connection contexts know we are shutting down
+ // so that in progress operations can notice and unblock.
+ List<TransportConnectionState> connectionStates=listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().getStopping().set(true);
+ }
+
+ if( taskRunner!=null ) {
+ taskRunner.wakeup();
+ // Give it a change to stop gracefully.
+ dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
+ disposeTransport();
+ taskRunner.shutdown();
+ } else {
+ disposeTransport();
+ }
+
+ if( taskRunner!=null )
+ taskRunner.shutdown();
+
+ // Run the MessageDispatch callbacks so that message references get cleaned up.
+ for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
+ Command command = (Command) iter.next();
+ if(command.isMessageDispatch()) {
+ MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=md.getTransmitCallback();
+ broker.processDispatch(md);
+ if(sub!=null){
+ sub.run();
+ }
+ }
+ }
+ //
+ // Remove all logical connection associated with this connection
+ // from the broker.
+
+ if (!broker.isStopped()) {
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().getStopping().set(true);
+ try {
+ log.debug("Cleaning up connection resources: " + getRemoteAddress());
+ processRemoveConnection(cs.getInfo().getConnectionId());
+ } catch (Throwable ignore) {
+ ignore.printStackTrace();
+ }
+ }
+
+ if (brokerInfo != null) {
+ broker.removeBroker(this, brokerInfo);
+ }
+ }
+ log.debug("Connection Stopped: " + getRemoteAddress());
+ }
+ }
+
/**
- * @return Returns the blockedCandidate.
- */
+ * @return Returns the blockedCandidate.
+ */
public boolean isBlockedCandidate(){
return blockedCandidate;
}
/**
- * @param blockedCandidate The blockedCandidate to set.
- */
+ * @param blockedCandidate
+ * The blockedCandidate to set.
+ */
public void setBlockedCandidate(boolean blockedCandidate){
this.blockedCandidate=blockedCandidate;
}
@@ -1115,15 +1113,16 @@
if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
}
- this.brokerInfo=info;
- broker.addBroker(this,info);
- networkConnection = true;
- for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
- ConnectionState cs = (ConnectionState) iter.next();
- cs.getContext().setNetworkConnection(true);
- }
-
- return null;
+ this.brokerInfo = info;
+ broker.addBroker(this, info);
+ networkConnection = true;
+
+ List<TransportConnectionState> connectionStates = listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().setNetworkConnection(true);
+ }
+
+ return null;
}
protected void dispatch(Command command) throws IOException{
@@ -1140,14 +1139,13 @@
}
public String getConnectionId() {
- Iterator iterator = localConnectionStates.values().iterator();
- ConnectionState object = (ConnectionState) iterator.next();
- if( object == null ) {
- return null;
- }
- if( object.getInfo().getClientId() !=null )
- return object.getInfo().getClientId();
- return object.getInfo().getConnectionId().toString();
+ List<TransportConnectionState> connectionStates = listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ if( cs.getInfo().getClientId() !=null )
+ return cs.getInfo().getClientId();
+ return cs.getInfo().getConnectionId().toString();
+ }
+ return null;
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
@@ -1155,7 +1153,7 @@
if(result==null){
synchronized(producerExchanges){
result=new ProducerBrokerExchange();
- ConnectionState state=lookupConnectionState(id);
+ TransportConnectionState state=lookupConnectionState(id);
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
@@ -1186,7 +1184,7 @@
if(result==null){
synchronized(consumerExchanges){
result=new ConsumerBrokerExchange();
- ConnectionState state=lookupConnectionState(id);
+ TransportConnectionState state=lookupConnectionState(id);
context=state.getContext();
result.setConnectionContext(context);
SessionState ss=state.getSessionState(id.getParentId());
@@ -1252,5 +1250,72 @@
public Response processConsumerControl(ConsumerControl control) throws Exception {
return null;
}
+
+ ///////////////////////////////////////////////////////////////////
+ //
+ // The following methods handle the logical connection state. It is possible
+ // multiple logical connections multiplexed over a single physical connection.
+ // But have not yet exploited the feature from the clients, so for performance
+ // reasons (to avoid a hash lookup) this class only keeps track of 1
+ // logical connection state.
+ //
+ // A sub class could override these methods to a full multiple logical connection
+ // support.
+ //
+ ///////////////////////////////////////////////////////////////////
+
+ protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
+ TransportConnectionState rc = connectionState;
+ connectionState = state;
+ return rc;
+ }
+
+ protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
+ TransportConnectionState rc = connectionState;
+ connectionState = null;
+ return rc;
+ }
+ protected List<TransportConnectionState> listConnectionStates() {
+ ArrayList<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
+ if( connectionState!=null ) {
+ rc.add(connectionState);
+ }
+ return rc;
+ }
+
+ protected TransportConnectionState lookupConnectionState(String connectionId){
+ TransportConnectionState cs=connectionState;
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: "
+ +connectionId);
+ return cs;
+ }
+ protected TransportConnectionState lookupConnectionState(ConsumerId id){
+ TransportConnectionState cs=connectionState;
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+ +id.getParentId().getParentId());
+ return cs;
+ }
+ protected TransportConnectionState lookupConnectionState(ProducerId id){
+ TransportConnectionState cs=connectionState;
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+ +id.getParentId().getParentId());
+ return cs;
+ }
+ protected TransportConnectionState lookupConnectionState(SessionId id){
+ TransportConnectionState cs=connectionState;
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+ +id.getParentId());
+ return cs;
+ }
+ protected TransportConnectionState lookupConnectionState(ConnectionId connectionId){
+ TransportConnectionState cs=connectionState;
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
+ return cs;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Sun Jul 29 22:34:37 2007
@@ -60,7 +60,7 @@
registerMBean(byAddressName);
}
- public synchronized void stop() throws Exception {
+ public void doStop() throws Exception {
if (isStarting()) {
setPendingStop(true);
return;
@@ -71,7 +71,7 @@
byClientIdName=null;
byAddressName=null;
}
- super.stop();
+ super.doStop();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jul 29 22:34:37 2007
@@ -57,6 +57,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -98,7 +99,7 @@
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
- protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+ protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
@@ -605,7 +606,7 @@
this.adminConnectionContext = adminConnectionContext;
}
- public Map getConnectionStates() {
+ public Map<ConnectionId, ConnectionState> getConnectionStates() {
return connectionStates;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Sun Jul 29 22:34:37 2007
@@ -24,6 +24,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
@@ -32,12 +34,9 @@
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class ConnectionState {
- final ConnectionInfo info;
+ ConnectionInfo info;
private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
@@ -52,6 +51,15 @@
public String toString() {
return info.toString();
}
+
+ public void reset(ConnectionInfo info) {
+ this.info=info;
+ transactions.clear();
+ sessions.clear();
+ tempDestinations.clear();
+ shutdown.set(false);
+ }
+
public void addTempDestination(DestinationInfo info) {
checkShutdown();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Sun Jul 29 22:34:37 2007
@@ -17,16 +17,6 @@
*/
package org.apache.activemq.transport.tcp;
-import org.apache.activemq.Service;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.net.SocketFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -40,6 +30,19 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportThreadSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
@@ -64,6 +67,7 @@
protected boolean useLocalHost = true;
protected int minmumWireFormatVersion;
protected SocketFactory socketFactory;
+ protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
private Map socketOptions;
private Boolean keepAlive;
@@ -131,24 +135,22 @@
*/
public void run() {
log.trace("TCP consumer thread starting");
- while (!isStopped()) {
- try {
- Object command = readCommand();
- doConsume(command);
- }
- catch (SocketTimeoutException e) {
- }
- catch (InterruptedIOException e) {
- }
- catch (IOException e) {
- try {
- stop();
- }
- catch (Exception e2) {
- log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
- }
- onException(e);
- }
+ try {
+ while (!isStopped()) {
+ try {
+ Object command = readCommand();
+ doConsume(command);
+ }
+ catch (SocketTimeoutException e) {
+ }
+ catch (InterruptedIOException e) {
+ }
+ }
+ } catch (IOException e) {
+ stoppedLatch.get().countDown();
+ onException(e);
+ } finally {
+ stoppedLatch.get().countDown();
}
}
@@ -301,6 +303,7 @@
protected void doStart() throws Exception {
connect();
+ stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
@@ -355,6 +358,7 @@
initializeStreams();
}
+
protected void doStop(ServiceStopper stopper) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Stopping transport " + this);
@@ -366,6 +370,19 @@
if (socket != null) {
socket.close();
}
+ }
+
+
+ /**
+ * Override so that stop() blocks until the run thread is no longer running.
+ */
+ @Override
+ public void stop() throws Exception {
+ super.stop();
+ CountDownLatch countDownLatch = stoppedLatch.get();
+ if( countDownLatch!=null ) {
+ countDownLatch.await();
+ }
}
protected void initializeStreams() throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sun Jul 29 22:34:37 2007
@@ -32,6 +32,7 @@
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,10 +54,8 @@
protected boolean network;
protected boolean async=true;
protected int asyncQueueDepth=2000;
- protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null;
protected boolean started;
- protected final Object startMutex = new Object();
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
@@ -85,45 +84,37 @@
}
if(peer==null)
throw new IOException("Peer not connected.");
- if(!peer.disposed){
- if(async){
- asyncOneWay(command);
- }else{
- syncOneWay(command);
- }
- }else{
- throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
- }
- }
- protected void syncOneWay(Object command){
TransportListener tl=null;
- synchronized(peer.startMutex){
+ synchronized(peer.mutex) {
+ if( peer.disposed ) {
+ throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
+ }
if( peer.started ) {
- tl = peer.transportListener;
- } else if(!peer.disposed) {
- peer.prePeerSetQueue.add(command);
+ if(peer.async){
+ peer.enqueue(command);
+ peer.wakeup();
+ } else {
+ tl = peer.transportListener;
+ }
+ } else {
+ peer.enqueue(command);
}
}
+
if( tl!=null ) {
- tl.onCommand(command);
- }
+ tl.onCommand(command);
+ }
+
}
- protected void asyncOneWay(Object command) throws IOException{
- try{
- synchronized(mutex){
- if(messageQueue==null){
- messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
- }
- }
- messageQueue.put(command);
- wakeup();
- }catch(final InterruptedException e){
- log.error("messageQueue interupted",e);
- throw new IOException(e.getMessage());
- }
- }
+ private void enqueue(Object command) throws IOException {
+ try{
+ getMessageQueue().put(command);
+ }catch(final InterruptedException e){
+ throw IOExceptionSupport.create(e);
+ }
+ }
public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
@@ -146,32 +137,38 @@
public void setTransportListener(TransportListener commandListener){
synchronized(mutex){
this.transportListener=commandListener;
+ wakeup();
}
- wakeup();
- peer.wakeup();
}
+ private LinkedBlockingQueue getMessageQueue() {
+ synchronized(mutex) {
+ if( messageQueue==null ) {
+ messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
+ return messageQueue;
+ }
+ }
+
+
public void start() throws Exception{
if(transportListener==null)
throw new IOException("TransportListener not set.");
- synchronized(startMutex) {
- if( !prePeerSetQueue.isEmpty() ) {
- for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
- Command command=(Command)iter.next();
- transportListener.onCommand(command);
- }
- prePeerSetQueue.clear();
- }
+
+ synchronized(mutex) {
+ if( messageQueue!=null ) {
+ Object command;
+ while( (command = messageQueue.poll()) !=null ) {
+ transportListener.onCommand(command);
+ }
+ }
started = true;
- if( isAsync() ) {
- peer.wakeup();
- wakeup();
- }
+ wakeup();
}
}
public void stop() throws Exception{
- synchronized(startMutex) {
+ synchronized(mutex) {
if(!disposed){
started=false;
disposed=true;
@@ -221,18 +218,21 @@
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate(){
- final TransportListener tl=peer.transportListener;
- Command command=null;
+ final TransportListener tl;
synchronized(mutex){
- if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
- command=(Command)messageQueue.poll();
- }
- }
- if(tl!=null&&command!=null){
+ tl = transportListener;
+ if( !started || disposed || tl==null )
+ return false;
+ }
+
+ LinkedBlockingQueue mq = getMessageQueue();
+ final Command command = (Command)mq.poll();
+ if( command!=null ) {
tl.onCommand(command);
- }
- boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
- return result;
+ return !mq.isEmpty();
+ } else {
+ return false;
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java?view=diff&rev=560872&r1=560871&r2=560872
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java Sun Jul 29 22:34:37 2007
@@ -184,8 +184,9 @@
* Make sure you cannot publish to a temp destination that does not exist anymore.
*
* @throws JMSException
+ * @throws InterruptedException
*/
- public void testPublishFailsForClosedConnection() throws JMSException {
+ public void testPublishFailsForClosedConnection() throws JMSException, InterruptedException {
Connection tempConnection = factory.createConnection();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -202,6 +203,7 @@
// Closing the connection should destroy the temp queue that was created.
tempConnection.close();
+ Thread.sleep(1000); // Wait a little bit to let the delete take effect.
// This message delivery NOT should work since the temp connection is now closed.
try {