You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/12/08 19:56:12 UTC
svn commit: r602529 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
ActiveMQConnection.java broker/TransportConnection.java
Author: chirino
Date: Sat Dec 8 10:56:11 2007
New Revision: 602529
URL: http://svn.apache.org/viewvc?rev=602529&view=rev
Log:
If you have a multi-threaded client which share 1 connection to the broker it is possible you may have seen errors like:
2007-12-05 14:14:52,606 [VMTransport] ERROR - org.apache.activemq.broker.TransportConnection.Service - Async error occurred: java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:113)
at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1313)
at org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:538)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:64)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118)
at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
root cause is that the client side has multiple threads calling close() on consumer/session and the connection objects. If the timing of those concurrent calls was just right, it is possible for the client to send the connection close message followed by a session or consumer close message which is invalid and would result in IllegalStateException reported by the broker.
- Simplified the ActiveMQConnection and TransportConnection shutdown so that this does does not happen.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=602529&r1=602528&r2=602529&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Sat Dec 8 10:56:11 2007
@@ -580,8 +580,8 @@
// If we announced ourselfs to the broker.. Try to let
// the broker
// know that the connection is being shutdown.
- syncSendPacket(info.createRemoveCommand(), closeTimeout);
- asyncSendPacket(new ShutdownInfo());
+ doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+ doAsyncSendPacket(new ShutdownInfo());
}
ServiceSupport.dispose(this.transport);
@@ -1144,18 +1144,21 @@
* @throws JMSException
*/
public void asyncSendPacket(Command command) throws JMSException {
- if (isClosed()) {
+ if (isClosed() || closing.get()) {
throw new ConnectionClosedException();
} else {
-
- try {
- this.transport.oneway(command);
- } catch (IOException e) {
- throw JMSExceptionSupport.create(e);
- }
+ doAsyncSendPacket(command);
}
}
+ private void doAsyncSendPacket(Command command) throws JMSException {
+ try {
+ this.transport.oneway(command);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+
/**
* Send a packet through a Connection - for internal use only
*
@@ -1193,26 +1196,30 @@
* @throws JMSException
*/
public Response syncSendPacket(Command command, int timeout) throws JMSException {
- if (isClosed()) {
+ if (isClosed() || closing.get()) {
throw new ConnectionClosedException();
} else {
-
- try {
- Response response = (Response)this.transport.request(command, timeout);
- if (response != null && response.isException()) {
- ExceptionResponse er = (ExceptionResponse)response;
- if (er.getException() instanceof JMSException) {
- throw (JMSException)er.getException();
- } else {
- throw JMSExceptionSupport.create(er.getException());
- }
- }
- return response;
- } catch (IOException e) {
- throw JMSExceptionSupport.create(e);
- }
+ return doSyncSendPacket(command, timeout);
}
}
+
+ private Response doSyncSendPacket(Command command, int timeout)
+ throws JMSException {
+ try {
+ Response response = (Response)this.transport.request(command, timeout);
+ if (response != null && response.isException()) {
+ ExceptionResponse er = (ExceptionResponse)response;
+ if (er.getException() instanceof JMSException) {
+ throw (JMSException)er.getException();
+ } else {
+ throw JMSExceptionSupport.create(er.getException());
+ }
+ }
+ return response;
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
/**
* @return statistics for this Connection
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=602529&r1=602528&r2=602529&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sat Dec 8 10:56:11 2007
@@ -139,7 +139,6 @@
private long timeStamp;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean transportDisposed = new AtomicBoolean();
- private final AtomicBoolean disposed = new AtomicBoolean(false);
private CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
@@ -198,7 +197,7 @@
}
public void serviceTransportException(IOException e) {
- if (!disposed.get()) {
+ if (!stopped.get()) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e);
@@ -240,7 +239,7 @@
// Handle the case where the broker is stopped
// But the client is still connected.
- if (!disposed.get()) {
+ if (!stopped.get()) {
if (SERVICELOG.isDebugEnabled()) {
SERVICELOG
.debug("Broker has been stopped. Notifying client and closing his connection.");
@@ -260,7 +259,7 @@
// notification gets to him.
ServiceSupport.dispose(this);
}
- } else if (!disposed.get() && !inServiceException) {
+ } else if (!stopped.get() && !inServiceException) {
inServiceException = true;
try {
SERVICELOG.error("Async error occurred: " + e, e);
@@ -669,11 +668,12 @@
return null;
}
- public Response processRemoveConnection(ConnectionId id) {
+ public Response processRemoveConnection(ConnectionId id) throws InterruptedException {
TransportConnectionState cs = lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are
// shutting down.
cs.shutdown();
+
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
SessionId sessionId = (SessionId)iter.next();
@@ -731,7 +731,7 @@
}
public void dispatchAsync(Command message) {
- if (!disposed.get()) {
+ if (!stopped.get()) {
getStatistics().getEnqueues().increment();
if (taskRunner == null) {
dispatchSync(message);
@@ -759,7 +759,7 @@
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
? command : null);
try {
- if (!disposed.get()) {
+ if (!stopped.get()) {
if (messageDispatch != null) {
broker.preProcessDispatch(messageDispatch);
}
@@ -779,7 +779,7 @@
public boolean iterate() {
try {
- if (disposed.get()) {
+ if (stopped.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
@@ -901,64 +901,67 @@
} catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore);
}
- if (disposed.compareAndSet(false, true)) {
- // Let all the connection contexts know we are shutting down
- // so that in progress operations can notice and unblock.
- List<TransportConnectionState> connectionStates = listConnectionStates();
- for (TransportConnectionState cs : connectionStates) {
- cs.getContext().getStopping().set(true);
- }
+ // Let all the connection contexts know we are shutting down
+ // so that in progress operations can notice and unblock.
+ List<TransportConnectionState> connectionStates = listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().getStopping().set(true);
+ }
- if (taskRunner != null) {
- taskRunner.wakeup();
- // Give it a change to stop gracefully.
- dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
- disposeTransport();
- taskRunner.shutdown();
- } else {
- disposeTransport();
- }
+ if (taskRunner != null) {
+ taskRunner.wakeup();
+ // Give it a change to stop gracefully.
+ dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
+ }
+
+ try {
+ transport.stop();
+ LOG.debug("Stopped connection: " + transport.getRemoteAddress());
+ } catch (Exception e) {
+ LOG.debug("Could not stop transport: " + e, e);
+ }
- if (taskRunner != null) {
- taskRunner.shutdown();
- }
+ if (taskRunner != null) {
+ taskRunner.shutdown();
+ }
- // Run the MessageDispatch callbacks so that message references get
- // cleaned up.
- for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
- Command command = iter.next();
- if (command.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch)command;
- Runnable sub = md.getTransmitCallback();
- broker.postProcessDispatch(md);
- if (sub != null) {
- sub.run();
- }
+ active = false;
+
+ // Run the MessageDispatch callbacks so that message references get
+ // cleaned up.
+ for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
+ Command command = iter.next();
+ if (command.isMessageDispatch()) {
+ MessageDispatch md = (MessageDispatch)command;
+ Runnable sub = md.getTransmitCallback();
+ broker.postProcessDispatch(md);
+ if (sub != null) {
+ sub.run();
}
}
- //
- // Remove all logical connection associated with this connection
- // from the broker.
-
- if (!broker.isStopped()) {
- connectionStates = listConnectionStates();
- for (TransportConnectionState cs : connectionStates) {
- cs.getContext().getStopping().set(true);
- try {
- LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
- processRemoveConnection(cs.getInfo().getConnectionId());
- } catch (Throwable ignore) {
- ignore.printStackTrace();
- }
- }
+ }
+ //
+ // Remove all logical connection associated with this connection
+ // from the broker.
- if (brokerInfo != null) {
- broker.removeBroker(this, brokerInfo);
+ if (!broker.isStopped()) {
+ connectionStates = listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().getStopping().set(true);
+ try {
+ LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
+ processRemoveConnection(cs.getInfo().getConnectionId());
+ } catch (Throwable ignore) {
+ ignore.printStackTrace();
}
}
- LOG.debug("Connection Stopped: " + getRemoteAddress());
+
+ if (brokerInfo != null) {
+ broker.removeBroker(this, brokerInfo);
+ }
}
+ LOG.debug("Connection Stopped: " + getRemoteAddress());
}
/**
@@ -1230,18 +1233,6 @@
private void removeConsumerBrokerExchange(ConsumerId id) {
synchronized (consumerExchanges) {
consumerExchanges.remove(id);
- }
- }
-
- protected void disposeTransport() {
- if (transportDisposed.compareAndSet(false, true)) {
- try {
- transport.stop();
- active = false;
- LOG.debug("Stopped connection: " + transport.getRemoteAddress());
- } catch (Exception e) {
- LOG.debug("Could not stop transport: " + e, e);
- }
}
}