You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/07 13:40:26 UTC

svn commit: r1664839 - in /qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport: SendingLinkEndpoint.java SessionEndpoint.java

Author: rgodfrey
Date: Sat Mar  7 12:40:26 2015
New Revision: 1664839

URL: http://svn.apache.org/r1664839
Log:
QPID-6437 : ensure session/link flow notifies occur

Modified:
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1664839&r1=1664838&r2=1664839&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Sat Mar  7 12:40:26 2015
@@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends
                 setLinkCredit(limit.subtract(getDeliveryCount()));
             }
         }
-
+        getSession().getConnection().addPostLockAction(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                flowStateChanged();
+            }
+        });
     }
 
     @Override

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1664839&r1=1664838&r2=1664839&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Sat Mar  7 12:40:26 2015
@@ -458,49 +458,40 @@ public class SessionEndpoint
     {
         synchronized (getLock())
         {
-            synchronized (getLock())
-            {
-                UnsignedInteger handle = flow.getHandle();
-                final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+            UnsignedInteger handle = flow.getHandle();
+            final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
 
-                final UnsignedInteger nextOutgoingId =
-                        flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-                int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
-                _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+            final UnsignedInteger nextOutgoingId =
+                    flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+            int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+            _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
 
-                if (endpoint != null)
-                {
-                    getConnection().addPostLockAction(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            endpoint.receiveFlow(flow);
-                        }
-                    });
-                }
-                else
+            if (endpoint != null)
+            {
+                endpoint.receiveFlow(flow);
+            }
+            else
+            {
+                final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+                getConnection().addPostLockAction(new Runnable()
                 {
-                    final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
-                    getConnection().addPostLockAction(new Runnable()
+                    @Override
+                    public void run()
                     {
-                        @Override
-                        public void run()
-                        {
 
-                            for(LinkEndpoint le : allLinkEndpoints)
-                            {
-                                le.flowStateChanged();
-                            }
+                        for(LinkEndpoint le : allLinkEndpoints)
+                        {
+                            le.flowStateChanged();
                         }
-                    });
-                }
-
-                getLock().notifyAll();
+                    }
+                });
             }
+
+            getLock().notifyAll();
         }
 
 
+
     }
 
     public void receiveDisposition(final Disposition disposition)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org