You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/10 12:56:33 UTC

svn commit: r1359595 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: AMQChannel.java protocol/AMQProtocolEngine.java protocol/AMQProtocolSession.java

Author: kwall
Date: Tue Jul 10 10:56:32 2012
New Revision: 1359595

URL: http://svn.apache.org/viewvc?rev=1359595&view=rev
Log:
QPID-4121: AMQProtocolEngine now uses lock to prevent the thread-unsafe use of AMQChannel's transaction which caused it to throw a ConcurrentModificationException.

Applied patch from Philip Harvey <ph...@philharveyonline.com> and Keith Wall <kw...@apache.org>.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jul 10 10:56:32 2012
@@ -1532,7 +1532,7 @@ public class AMQChannel implements Sessi
                 _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
             }
 
-            // Close connection for idle or open transactions that have timed out
+            // Close session for idle or open transactions that have timed out
             if (idleClose > 0L && idleTime > idleClose)
             {
                 getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jul 10 10:56:32 2012
@@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 import org.apache.log4j.Logger;
@@ -152,8 +155,11 @@ public class AMQProtocolEngine implement
     private long _lastReceivedTime;
     private boolean _blocking;
 
+    private final Lock _receivedLock;
+
     public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
     {
+        _receivedLock = new ReentrantLock();
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _codecFactory = new AMQCodecFactory(true, this);
 
@@ -225,6 +231,8 @@ public class AMQProtocolEngine implement
         final long arrivalTime = System.currentTimeMillis();
         _lastReceivedTime = arrivalTime;
         _lastIoTime = arrivalTime;
+
+        _receivedLock.lock();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -249,6 +257,10 @@ public class AMQProtocolEngine implement
             _logger.error("Unexpected exception when processing datablock", e);
             closeProtocolSession();
         }
+        finally
+        {
+            _receivedLock.unlock();
+        }
     }
 
     private void receiveComplete()
@@ -815,7 +827,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+    private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
     {
         try
         {
@@ -1308,17 +1320,25 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
-        int channelId = ((AMQChannel)session).getChannelId();
-        closeChannel(channelId);
+        _receivedLock.lock();
+        try
+        {
+            int channelId = ((AMQChannel)session).getChannelId();
+            closeChannel(channelId);
 
-        MethodRegistry methodRegistry = getMethodRegistry();
-        ChannelCloseBody responseBody =
-                methodRegistry.createChannelCloseBody(
-                        cause.getCode(),
-                        new AMQShortString(message),
-                        0,0);
+            MethodRegistry methodRegistry = getMethodRegistry();
+            ChannelCloseBody responseBody =
+                    methodRegistry.createChannelCloseBody(
+                            cause.getCode(),
+                            new AMQShortString(message),
+                            0,0);
 
-        writeFrame(responseBody.generateFrame(channelId));
+            writeFrame(responseBody.generateFrame(channelId));
+        }
+        finally
+        {
+            _receivedLock.unlock();
+        }
     }
 
     public void close(AMQConstant cause, String message) throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1359595&r1=1359594&r2=1359595&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jul 10 10:56:32 2012
@@ -154,10 +154,6 @@ public interface AMQProtocolSession exte
 
     void closeProtocolSession();
 
-    /** This must be called to close the session in order to free up any resources managed by the session. */
-    void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
-
-
     /** @return a key that uniquely identifies this session */
     Object getKey();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org