You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/14 13:50:12 UTC
svn commit: r474771 -
/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Author: chirino
Date: Tue Nov 14 04:50:11 2006
New Revision: 474771
URL: http://svn.apache.org/viewvc?view=rev&rev=474771
Log:
Merged in rev 474769
Modified:
incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=474771&r1=474770&r2=474771
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 14 04:50:11 2006
@@ -123,6 +123,7 @@
private AtomicBoolean stopped = new AtomicBoolean(false);
protected final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch = new CountDownLatch(1);
+ protected final AtomicBoolean asyncException = new AtomicBoolean(false);
static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
@@ -217,11 +218,13 @@
* @param e
*/
public void serviceExceptionAsync(final IOException e) {
- new Thread("Async Exception Handler") {
- public void run() {
- serviceException(e);
- }
- }.start();
+ if( asyncException.compareAndSet(false, true) ) {
+ new Thread("Async Exception Handler") {
+ public void run() {
+ serviceException(e);
+ }
+ }.start();
+ }
}
/**
@@ -349,7 +352,7 @@
return null;
}
- public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -364,14 +367,14 @@
return null;
}
- public Response processEndTransaction(TransactionInfo info) throws Exception {
+ synchronized public Response processEndTransaction(TransactionInfo info) throws Exception {
// No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could
// come from a different connection.
return null;
}
- public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -395,7 +398,7 @@
}
}
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -409,7 +412,7 @@
}
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -421,7 +424,7 @@
return null;
}
- public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -433,7 +436,7 @@
return null;
}
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -443,7 +446,7 @@
return null;
}
- public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
if( cs!=null ) {
@@ -498,7 +501,7 @@
return null;
}
- public Response processAddDestination(DestinationInfo info) throws Exception {
+ synchronized public Response processAddDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
@@ -507,7 +510,7 @@
return null;
}
- public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
@@ -517,7 +520,7 @@
}
- public Response processAddProducer(ProducerInfo info) throws Exception {
+ synchronized public Response processAddProducer(ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
@@ -538,7 +541,7 @@
return null;
}
- public Response processRemoveProducer(ProducerId id) throws Exception {
+ synchronized public Response processRemoveProducer(ProducerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
@@ -554,7 +557,7 @@
return null;
}
- public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception {
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
@@ -576,7 +579,7 @@
return null;
}
- public Response processRemoveConsumer(ConsumerId id) throws Exception {
+ synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
@@ -593,7 +596,7 @@
return null;
}
- public Response processAddSession(SessionInfo info) throws Exception {
+ synchronized public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId);
@@ -609,7 +612,7 @@
return null;
}
- public Response processRemoveSession(SessionId id) throws Exception {
+ synchronized public Response processRemoveSession(SessionId id) throws Exception {
ConnectionId connectionId = id.getParentId();
@@ -646,7 +649,7 @@
return null;
}
- public Response processAddConnection(ConnectionInfo info) throws Exception {
+ synchronized public Response processAddConnection(ConnectionInfo info) throws Exception {
ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
@@ -695,7 +698,7 @@
return null;
}
- public Response processRemoveConnection(ConnectionId id) {
+ synchronized public Response processRemoveConnection(ConnectionId id) {
ConnectionState cs = lookupConnectionState(id);