You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/12/08 19:56:12 UTC

svn commit: r602529 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java broker/TransportConnection.java

Author: chirino
Date: Sat Dec  8 10:56:11 2007
New Revision: 602529

URL: http://svn.apache.org/viewvc?rev=602529&view=rev
Log:
If you have a multi-threaded client which share 1 connection to the broker it is possible you may have seen errors like:

2007-12-05 14:14:52,606 [VMTransport] ERROR - org.apache.activemq.broker.TransportConnection.Service - Async error occurred: java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:113)
at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1313)
at org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:538)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:64)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118)
at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)


root cause is that the client side has multiple threads calling close() on consumer/session and the connection objects. If the timing of those concurrent calls was just right, it is possible for the client to send the connection close message followed by a session or consumer close message which is invalid and would result in IllegalStateException reported by the broker.

- Simplified the ActiveMQConnection and TransportConnection shutdown so that this does does not happen.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=602529&r1=602528&r2=602529&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Sat Dec  8 10:56:11 2007
@@ -580,8 +580,8 @@
                         // If we announced ourselfs to the broker.. Try to let
                         // the broker
                         // know that the connection is being shutdown.
-                        syncSendPacket(info.createRemoveCommand(), closeTimeout);
-                        asyncSendPacket(new ShutdownInfo());
+                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+                        doAsyncSendPacket(new ShutdownInfo());
                     }
 
                     ServiceSupport.dispose(this.transport);
@@ -1144,18 +1144,21 @@
      * @throws JMSException
      */
     public void asyncSendPacket(Command command) throws JMSException {
-        if (isClosed()) {
+        if (isClosed() || closing.get()) {
             throw new ConnectionClosedException();
         } else {
-
-            try {
-                this.transport.oneway(command);
-            } catch (IOException e) {
-                throw JMSExceptionSupport.create(e);
-            }
+            doAsyncSendPacket(command);
         }
     }
 
+	private void doAsyncSendPacket(Command command) throws JMSException {
+		try {
+		    this.transport.oneway(command);
+		} catch (IOException e) {
+		    throw JMSExceptionSupport.create(e);
+		}
+	}
+
     /**
      * Send a packet through a Connection - for internal use only
      * 
@@ -1193,26 +1196,30 @@
      * @throws JMSException
      */
     public Response syncSendPacket(Command command, int timeout) throws JMSException {
-        if (isClosed()) {
+        if (isClosed() || closing.get()) {
             throw new ConnectionClosedException();
         } else {
-
-            try {
-                Response response = (Response)this.transport.request(command, timeout);
-                if (response != null && response.isException()) {
-                    ExceptionResponse er = (ExceptionResponse)response;
-                    if (er.getException() instanceof JMSException) {
-                        throw (JMSException)er.getException();
-                    } else {
-                        throw JMSExceptionSupport.create(er.getException());
-                    }
-                }
-                return response;
-            } catch (IOException e) {
-                throw JMSExceptionSupport.create(e);
-            }
+            return doSyncSendPacket(command, timeout);
         }
     }
+
+	private Response doSyncSendPacket(Command command, int timeout)
+			throws JMSException {
+		try {
+		    Response response = (Response)this.transport.request(command, timeout);
+		    if (response != null && response.isException()) {
+		        ExceptionResponse er = (ExceptionResponse)response;
+		        if (er.getException() instanceof JMSException) {
+		            throw (JMSException)er.getException();
+		        } else {
+		            throw JMSExceptionSupport.create(er.getException());
+		        }
+		    }
+		    return response;
+		} catch (IOException e) {
+		    throw JMSExceptionSupport.create(e);
+		}
+	}
 
     /**
      * @return statistics for this Connection

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=602529&r1=602528&r2=602529&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Sat Dec  8 10:56:11 2007
@@ -139,7 +139,6 @@
     private long timeStamp;
     private final AtomicBoolean stopped = new AtomicBoolean(false);
     private final AtomicBoolean transportDisposed = new AtomicBoolean();
-    private final AtomicBoolean disposed = new AtomicBoolean(false);
     private CountDownLatch stopLatch = new CountDownLatch(1);
     private final AtomicBoolean asyncException = new AtomicBoolean(false);
     private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
@@ -198,7 +197,7 @@
     }
 
     public void serviceTransportException(IOException e) {
-        if (!disposed.get()) {
+        if (!stopped.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug("Transport failed: " + e, e);
@@ -240,7 +239,7 @@
             // Handle the case where the broker is stopped
             // But the client is still connected.
 
-            if (!disposed.get()) {
+            if (!stopped.get()) {
                 if (SERVICELOG.isDebugEnabled()) {
                     SERVICELOG
                         .debug("Broker has been stopped.  Notifying client and closing his connection.");
@@ -260,7 +259,7 @@
                 // notification gets to him.
                 ServiceSupport.dispose(this);
             }
-        } else if (!disposed.get() && !inServiceException) {
+        } else if (!stopped.get() && !inServiceException) {
             inServiceException = true;
             try {
                 SERVICELOG.error("Async error occurred: " + e, e);
@@ -669,11 +668,12 @@
         return null;
     }
 
-    public Response processRemoveConnection(ConnectionId id) {
+    public Response processRemoveConnection(ConnectionId id) throws InterruptedException {
         TransportConnectionState cs = lookupConnectionState(id);
         // Don't allow things to be added to the connection state while we are
         // shutting down.
         cs.shutdown();
+        
         // Cascade the connection stop to the sessions.
         for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
             SessionId sessionId = (SessionId)iter.next();
@@ -731,7 +731,7 @@
     }
 
     public void dispatchAsync(Command message) {
-        if (!disposed.get()) {
+        if (!stopped.get()) {
             getStatistics().getEnqueues().increment();
             if (taskRunner == null) {
                 dispatchSync(message);
@@ -759,7 +759,7 @@
         final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
             ? command : null);
         try {
-            if (!disposed.get()) {
+            if (!stopped.get()) {
                 if (messageDispatch != null) {
                     broker.preProcessDispatch(messageDispatch);
                 }
@@ -779,7 +779,7 @@
 
     public boolean iterate() {
         try {
-            if (disposed.get()) {
+            if (stopped.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -901,64 +901,67 @@
         } catch (Exception ignore) {
             LOG.trace("Exception caught stopping", ignore);
         }
-        if (disposed.compareAndSet(false, true)) {
 
-            // Let all the connection contexts know we are shutting down
-            // so that in progress operations can notice and unblock.
-            List<TransportConnectionState> connectionStates = listConnectionStates();
-            for (TransportConnectionState cs : connectionStates) {
-                cs.getContext().getStopping().set(true);
-            }
+        // Let all the connection contexts know we are shutting down
+        // so that in progress operations can notice and unblock.
+        List<TransportConnectionState> connectionStates = listConnectionStates();
+        for (TransportConnectionState cs : connectionStates) {
+            cs.getContext().getStopping().set(true);
+        }
 
-            if (taskRunner != null) {
-                taskRunner.wakeup();
-                // Give it a change to stop gracefully.
-                dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
-                disposeTransport();
-                taskRunner.shutdown();
-            } else {
-                disposeTransport();
-            }
+        if (taskRunner != null) {
+            taskRunner.wakeup();
+            // Give it a change to stop gracefully.
+            dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
+        }
+        
+        try {
+            transport.stop();
+            LOG.debug("Stopped connection: " + transport.getRemoteAddress());
+        } catch (Exception e) {
+            LOG.debug("Could not stop transport: " + e, e);
+        }
 
-            if (taskRunner != null) {
-                taskRunner.shutdown();
-            }
+        if (taskRunner != null) {
+            taskRunner.shutdown();
+        }
 
-            // Run the MessageDispatch callbacks so that message references get
-            // cleaned up.
-            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
-                Command command = iter.next();
-                if (command.isMessageDispatch()) {
-                    MessageDispatch md = (MessageDispatch)command;
-                    Runnable sub = md.getTransmitCallback();
-                    broker.postProcessDispatch(md);
-                    if (sub != null) {
-                        sub.run();
-                    }
+        active = false;
+
+        // Run the MessageDispatch callbacks so that message references get
+        // cleaned up.
+        for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
+            Command command = iter.next();
+            if (command.isMessageDispatch()) {
+                MessageDispatch md = (MessageDispatch)command;
+                Runnable sub = md.getTransmitCallback();
+                broker.postProcessDispatch(md);
+                if (sub != null) {
+                    sub.run();
                 }
             }
-            //
-            // Remove all logical connection associated with this connection
-            // from the broker.
-
-            if (!broker.isStopped()) {
-                connectionStates = listConnectionStates();
-                for (TransportConnectionState cs : connectionStates) {
-                    cs.getContext().getStopping().set(true);
-                    try {
-                        LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
-                        processRemoveConnection(cs.getInfo().getConnectionId());
-                    } catch (Throwable ignore) {
-                        ignore.printStackTrace();
-                    }
-                }
+        }
+        //
+        // Remove all logical connection associated with this connection
+        // from the broker.
 
-                if (brokerInfo != null) {
-                    broker.removeBroker(this, brokerInfo);
+        if (!broker.isStopped()) {
+            connectionStates = listConnectionStates();
+            for (TransportConnectionState cs : connectionStates) {
+                cs.getContext().getStopping().set(true);
+                try {
+                    LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
+                    processRemoveConnection(cs.getInfo().getConnectionId());
+                } catch (Throwable ignore) {
+                    ignore.printStackTrace();
                 }
             }
-            LOG.debug("Connection Stopped: " + getRemoteAddress());
+
+            if (brokerInfo != null) {
+                broker.removeBroker(this, brokerInfo);
+            }
         }
+        LOG.debug("Connection Stopped: " + getRemoteAddress());
     }
 
     /**
@@ -1230,18 +1233,6 @@
     private void removeConsumerBrokerExchange(ConsumerId id) {
         synchronized (consumerExchanges) {
             consumerExchanges.remove(id);
-        }
-    }
-
-    protected void disposeTransport() {
-        if (transportDisposed.compareAndSet(false, true)) {
-            try {
-                transport.stop();
-                active = false;
-                LOG.debug("Stopped connection: " + transport.getRemoteAddress());
-            } catch (Exception e) {
-                LOG.debug("Could not stop transport: " + e, e);
-            }
         }
     }