You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/06 23:06:26 UTC

svn commit: r1664731 - in /qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport: ConnectionEndpoint.java SessionEndpoint.java

Author: rgodfrey
Date: Fri Mar  6 22:06:25 2015
New Revision: 1664731

URL: http://svn.apache.org/r1664731
Log:
QPID-6347 : ensure link enpoints are notified outside of holding the connection lock

Modified:
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1664731&r1=1664730&r2=1664731&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Fri Mar  6 22:06:25 2015
@@ -33,6 +33,7 @@ import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -148,6 +149,7 @@ public class ConnectionEndpoint implemen
     private String _localHostname;
     private boolean _secure;
     private Principal _externalPrincipal;
+    private List<Runnable> _postLockActions = new ArrayList<>();
 
     public ConnectionEndpoint(Container container, SaslServerProvider cbs)
     {
@@ -663,14 +665,9 @@ public class ConnectionEndpoint implemen
         }
     }
 
-    public void receiveFlow(short channel, Flow flow)
+    public synchronized void receiveFlow(short channel, Flow flow)
     {
-        SessionEndpoint endPoint;
-        synchronized (this)
-        {
-            endPoint = getSession(channel);
-        }
-
+        SessionEndpoint endPoint = getSession(channel);
         if (endPoint != null)
         {
             endPoint.receiveFlow(flow);
@@ -797,22 +794,37 @@ public class ConnectionEndpoint implemen
         _logger = logger;
     }
 
-    public synchronized void receive(final short channel, final Object frame)
+    public void receive(final short channel, final Object frame)
     {
-        if (_logger.isEnabled())
+        List<Runnable> postLockActions;
+        synchronized(this)
         {
-            _logger.received(_remoteAddress, channel, frame);
-        }
-        if (frame instanceof FrameBody)
-        {
-            ((FrameBody) frame).invoke(channel, this);
+            if (_logger.isEnabled())
+            {
+                _logger.received(_remoteAddress, channel, frame);
+            }
+            if (frame instanceof FrameBody)
+            {
+                ((FrameBody) frame).invoke(channel, this);
+            }
+            else if (frame instanceof SaslFrameBody)
+            {
+                ((SaslFrameBody) frame).invoke(this);
+            }
+            postLockActions = _postLockActions;
+            _postLockActions = new ArrayList<>();
         }
-        else if (frame instanceof SaslFrameBody)
+        for(Runnable action : postLockActions)
         {
-            ((SaslFrameBody) frame).invoke(this);
+            action.run();
         }
     }
 
+    synchronized void addPostLockAction(Runnable action)
+    {
+        _postLockActions.add(action);
+    }
+
     public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
     {
         return _describedTypeRegistry;

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1664731&r1=1664730&r2=1664731&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Fri Mar  6 22:06:25 2015
@@ -456,32 +456,48 @@ public class SessionEndpoint
 
     public void receiveFlow(final Flow flow)
     {
-        Collection<LinkEndpoint> endpoints = new ArrayList<>();
-        synchronized(getLock())
+        synchronized (getLock())
         {
-            UnsignedInteger handle = flow.getHandle();
-            LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
-
-            final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-            int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
-            _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
-
-            if(endpoint != null)
-            {
-                endpoint.receiveFlow( flow );
-                endpoints.add(endpoint);
-            }
-            else
+            synchronized (getLock())
             {
-                endpoints.addAll(_remoteLinkEndpoints.values());
-            }
+                UnsignedInteger handle = flow.getHandle();
+                final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
 
-            getLock().notifyAll();
-        }
+                final UnsignedInteger nextOutgoingId =
+                        flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+                int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+                _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+
+                if (endpoint != null)
+                {
+                    getConnection().addPostLockAction(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            endpoint.receiveFlow(flow);
+                        }
+                    });
+                }
+                else
+                {
+                    final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+                    getConnection().addPostLockAction(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+
+                            for(LinkEndpoint le : allLinkEndpoints)
+                            {
+                                le.flowStateChanged();
+                            }
+                        }
+                    });
+                }
 
-        for(LinkEndpoint le : endpoints)
-        {
-            le.flowStateChanged();
+                getLock().notifyAll();
+            }
         }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org