You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/09 20:07:30 UTC
svn commit: r1665326 - in /qpid/branches/0.32/qpid: ./ java/
java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
Author: rgodfrey
Date: Mon Mar 9 19:07:29 2015
New Revision: 1665326
URL: http://svn.apache.org/r1665326
Log:
QPID-6437 : merged r1664714,1664731,1664839 from trunk to 0.32 branch
Modified:
qpid/branches/0.32/qpid/ (props changed)
qpid/branches/0.32/qpid/java/ (props changed)
qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
Propchange: qpid/branches/0.32/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Mar 9 19:07:29 2015
@@ -6,4 +6,4 @@
/qpid/branches/mcpierce-QPID-4719/qpid:1477004-1477093
/qpid/branches/qpid-2935/qpid:1061302-1072333
/qpid/branches/qpid-3346/qpid:1144319-1179855
-/qpid/trunk/qpid:1661079,1661142,1661162,1661165-1661166,1661207,1661212,1661364,1661368,1661373,1661530-1661531,1661693,1661741,1661929,1662051,1662473,1662489,1662683,1664160,1664334
+/qpid/trunk/qpid:1661079,1661142,1661162,1661165-1661166,1661207,1661212,1661364,1661368,1661373,1661530-1661531,1661693,1661741,1661929,1662051,1662473,1662489,1662683,1664160,1664334,1664714,1664731,1664839
Propchange: qpid/branches/0.32/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Mar 9 19:07:29 2015
@@ -9,4 +9,4 @@
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
/qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:1661079,1661142,1661162,1661165-1661166,1661207,1661212,1661364,1661368,1661373,1661530-1661531,1661693,1661741,1661929,1662051,1662473,1662489,1662683,1664114,1664160,1664309,1664334
+/qpid/trunk/qpid/java:1661079,1661142,1661162,1661165-1661166,1661207,1661212,1661364,1661368,1661373,1661530-1661531,1661693,1661741,1661929,1662051,1662473,1662489,1662683,1664114,1664160,1664309,1664334,1664714,1664731,1664839
Modified: qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1665326&r1=1665325&r2=1665326&view=diff
==============================================================================
--- qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Mar 9 19:07:29 2015
@@ -33,6 +33,7 @@ import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -148,6 +149,7 @@ public class ConnectionEndpoint implemen
private String _localHostname;
private boolean _secure;
private Principal _externalPrincipal;
+ private List<Runnable> _postLockActions = new ArrayList<>();
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
@@ -792,22 +794,37 @@ public class ConnectionEndpoint implemen
_logger = logger;
}
- public synchronized void receive(final short channel, final Object frame)
+ public void receive(final short channel, final Object frame)
{
- if (_logger.isEnabled())
+ List<Runnable> postLockActions;
+ synchronized(this)
{
- _logger.received(_remoteAddress, channel, frame);
- }
- if (frame instanceof FrameBody)
- {
- ((FrameBody) frame).invoke(channel, this);
+ if (_logger.isEnabled())
+ {
+ _logger.received(_remoteAddress, channel, frame);
+ }
+ if (frame instanceof FrameBody)
+ {
+ ((FrameBody) frame).invoke(channel, this);
+ }
+ else if (frame instanceof SaslFrameBody)
+ {
+ ((SaslFrameBody) frame).invoke(this);
+ }
+ postLockActions = _postLockActions;
+ _postLockActions = new ArrayList<>();
}
- else if (frame instanceof SaslFrameBody)
+ for(Runnable action : postLockActions)
{
- ((SaslFrameBody) frame).invoke(this);
+ action.run();
}
}
+ synchronized void addPostLockAction(Runnable action)
+ {
+ _postLockActions.add(action);
+ }
+
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
{
return _describedTypeRegistry;
Modified: qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1665326&r1=1665325&r2=1665326&view=diff
==============================================================================
--- qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (original)
+++ qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Mon Mar 9 19:07:29 2015
@@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends
setLinkCredit(limit.subtract(getDeliveryCount()));
}
}
-
+ getSession().getConnection().addPostLockAction(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ flowStateChanged();
+ }
+ });
}
@Override
Modified: qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java?rev=1665326&r1=1665325&r2=1665326&view=diff
==============================================================================
--- qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java (original)
+++ qpid/branches/0.32/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java Mon Mar 9 19:07:29 2015
@@ -456,33 +456,40 @@ public class SessionEndpoint
public void receiveFlow(final Flow flow)
{
- Collection<LinkEndpoint> endpoints = new ArrayList<>();
- synchronized(getLock())
+ synchronized (getLock())
{
UnsignedInteger handle = flow.getHandle();
- LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+ final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
- final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+ final UnsignedInteger nextOutgoingId =
+ flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
_outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
- if(endpoint != null)
+ if (endpoint != null)
{
- endpoint.receiveFlow( flow );
- endpoints.add(endpoint);
+ endpoint.receiveFlow(flow);
}
else
{
- endpoints.addAll(_remoteLinkEndpoints.values());
+ final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+ getConnection().addPostLockAction(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ for(LinkEndpoint le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
+ }
+ }
+ });
}
getLock().notifyAll();
}
- for(LinkEndpoint le : endpoints)
- {
- le.flowStateChanged();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org