You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/12 14:19:35 UTC

svn commit: r1360651 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: AMQChannel.java protocol/AMQProtocolEngine.java protocol/AMQProtocolSession.java

Author: kwall
Date: Thu Jul 12 12:19:34 2012
New Revision: 1360651

URL: http://svn.apache.org/viewvc?rev=1360651&view=rev
Log:
QPID-4131: On 0-8...0-9-1 code path broker now closes the connection when the housekeeping thread times out a transaction. AMQChannel now uses AMQProtocolEngine's _receivedLock so that this connection-closing is thread-safe. This gives better compatibility with older clients that do not hand session closes correctly. 0-10 behaviour unaffected by this change.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1360651&r1=1360650&r2=1360651&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Jul 12 12:19:34 2012
@@ -36,6 +36,8 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
@@ -1552,18 +1554,37 @@ public class AMQChannel implements Sessi
                 _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
             }
 
-            // Close session for idle or open transactions that have timed out
+            // Close _connection_ for idle or open transactions that have timed out (this is different
+            // than the 0-10 code path which closes the session).
             if (idleClose > 0L && idleTime > idleClose)
             {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+                closeConnection("Idle transaction timed out");
             }
             else if (openClose > 0L && openTime > openClose)
             {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+                closeConnection("Open transaction timed out");
             }
         }
     }
 
+    /**
+     * Typically called from the HouseKeepingThread instead of the main receiver thread,
+     * therefore uses a lock to close the connection in a thread-safe manner.
+     */
+    private void closeConnection(String reason) throws AMQException
+    {
+        Lock receivedLock = _session.getReceivedLock();
+        receivedLock.lock();
+        try
+        {
+            _session.close(AMQConstant.RESOURCE_ERROR, reason);
+        }
+        finally
+        {
+            receivedLock.unlock();
+        }
+    }
+
     public void deadLetter(long deliveryTag) throws AMQException
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1360651&r1=1360650&r2=1360651&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Jul 12 12:19:34 2012
@@ -1323,25 +1323,17 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
-        _receivedLock.lock();
-        try
-        {
-            int channelId = ((AMQChannel)session).getChannelId();
-            closeChannel(channelId);
+        int channelId = ((AMQChannel)session).getChannelId();
+        closeChannel(channelId);
 
-            MethodRegistry methodRegistry = getMethodRegistry();
-            ChannelCloseBody responseBody =
-                    methodRegistry.createChannelCloseBody(
-                            cause.getCode(),
-                            new AMQShortString(message),
-                            0,0);
+        MethodRegistry methodRegistry = getMethodRegistry();
+        ChannelCloseBody responseBody =
+                methodRegistry.createChannelCloseBody(
+                        cause.getCode(),
+                        new AMQShortString(message),
+                        0,0);
 
-            writeFrame(responseBody.generateFrame(channelId));
-        }
-        finally
-        {
-            _receivedLock.unlock();
-        }
+        writeFrame(responseBody.generateFrame(channelId));
     }
 
     public void close(AMQConstant cause, String message) throws AMQException
@@ -1497,4 +1489,9 @@ public class AMQProtocolEngine implement
     {
         return _reference;
     }
+
+    public Lock getReceivedLock()
+    {
+        return _receivedLock;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1360651&r1=1360650&r2=1360651&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Jul 12 12:19:34 2012
@@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol;
 import java.net.SocketAddress;
 import java.security.Principal;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
 
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -217,4 +217,5 @@ public interface AMQProtocolSession exte
 
     public Principal getPeerPrincipal();
 
+    Lock getReceivedLock();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org