You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/09/28 12:18:30 UTC
svn commit: r1391385 - in
/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl:
TransportImpl.java TransportLink.java TransportSession.java
Author: rgodfrey
Date: Fri Sep 28 10:18:29 2012
New Revision: 1391385
URL: http://svn.apache.org/viewvc?rev=1391385&view=rev
Log:
PROTON-37 : Fix failing python tests
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Fri Sep 28 10:18:29 2012
@@ -91,6 +91,7 @@ public class TransportImpl extends Endpo
private final ByteBuffer _overflowBuffer = ByteBuffer.wrap(new byte[_maxFrameSize]);
private static final byte AMQP_FRAME_TYPE = 0;
+ private boolean _closeReceived;
{
@@ -158,14 +159,10 @@ public class TransportImpl extends Endpo
_overflowBuffer.flip();
}
- if(_overflowBuffer.position() == 0)
- {
- clearInterestList();
- //clearTransportWorkList();
- }
return written;
- }catch (RuntimeException e)
+ }
+ catch (RuntimeException e)
{
e.printStackTrace();
throw e;
@@ -192,27 +189,33 @@ public class TransportImpl extends Endpo
if(endpoint instanceof LinkImpl)
{
-
LinkImpl link = (LinkImpl) endpoint;
TransportLink transportLink = getTransportState(link);
+ SessionImpl session = link.getSession();
+ TransportSession transportSession = getTransportState(session);
+
if(link.getLocalState() == EndpointState.CLOSED
&& transportLink.isLocalHandleSet())
{
+ if(!(link instanceof SenderImpl)
+ || link.getQueued() == 0
+ || transportLink.detachReceived()
+ || transportSession.endReceived()
+ || _closeReceived)
+ {
+ UnsignedInteger localHandle = transportLink.getLocalHandle();
+ transportLink.clearLocalHandle();
+ transportSession.freeLocalHandle(localHandle);
- SessionImpl session = link.getSession();
- TransportSession transportSession = getTransportState(session);
- UnsignedInteger localHandle = transportLink.getLocalHandle();
- transportLink.clearLocalHandle();
- transportSession.freeLocalHandle(localHandle);
-
-
- Detach detach = new Detach();
- detach.setHandle(localHandle);
+ Detach detach = new Detach();
+ detach.setHandle(localHandle);
- int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), detach, null);
- written += frameBytes;
+ int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), detach, null);
+ written += frameBytes;
+ endpoint.clearModified();
+ }
}
}
@@ -251,6 +254,7 @@ public class TransportImpl extends Endpo
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), flow, null);
written += frameBytes;
+ endpoint.clearModified();
}
}
@@ -358,8 +362,6 @@ public class TransportImpl extends Endpo
transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
}
- delivery.getLink().decrementQueued();
-
delivery = delivery.clearTransportWork();
@@ -448,7 +450,12 @@ public class TransportImpl extends Endpo
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), flow, null);
written += frameBytes;
+ if(receiver.getLocalState() == EndpointState.ACTIVE)
+ {
+ endpoint.clearModified();
+ }
}
+
}
}
endpoint = endpoint.transportNext();
@@ -487,15 +494,15 @@ public class TransportImpl extends Endpo
{
EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
int written = 0;
+
while(endpoint != null && buffer.remaining() >= _maxFrameSize)
{
-
if(endpoint instanceof LinkImpl)
{
LinkImpl link = (LinkImpl) endpoint;
TransportLink transportLink = getTransportState(link);
- if(link.getLocalState() == EndpointState.ACTIVE)
+ if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent())
{
if( (link.getRemoteState() == EndpointState.ACTIVE
@@ -539,10 +546,16 @@ public class TransportImpl extends Endpo
int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), attach, null);
written += frameBytes;
+ transportLink.sentAttach();
+ if(link.getLocalState() == EndpointState.ACTIVE && (link instanceof SenderImpl || !link.hasCredit()))
+ {
+ endpoint.clearModified();
+ }
}
}
}
+
endpoint = endpoint.transportNext();
}
return written;
@@ -598,7 +611,7 @@ public class TransportImpl extends Endpo
{
SessionImpl session = (SessionImpl) endpoint;
TransportSession transportSession = getTransportState(session);
- if(session.getLocalState() == EndpointState.ACTIVE)
+ if(session.getLocalState() != EndpointState.UNINITIALIZED && !transportSession.beginSent())
{
int channelId = allocateLocalChannel();
transportSession.setLocalChannel(channelId);
@@ -617,6 +630,11 @@ public class TransportImpl extends Endpo
begin.setNextOutgoingId(transportSession.getNextOutgoingId());
written += writeFrame(buffer, channelId, begin, null);
+ transportSession.sentBegin();
+ if(session.getLocalState() == EndpointState.ACTIVE)
+ {
+ endpoint.clearModified();
+ }
}
}
endpoint = endpoint.transportNext();
@@ -658,52 +676,76 @@ public class TransportImpl extends Endpo
int written = 0;
while(endpoint != null && buffer.remaining() >= _maxFrameSize)
{
+ SessionImpl session;
+ TransportSession transportSession;
- if(endpoint instanceof SessionImpl)
+ if((endpoint instanceof SessionImpl)
+ && (session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
+ && (transportSession = session.getTransportSession()).isLocalChannelSet()
+ && !hasSendableMessages(session))
{
+ int channel = transportSession.getLocalChannel();
+ transportSession.freeLocalChannel();
+ _localSessions[channel] = null;
- SessionImpl session = (SessionImpl) endpoint;
- TransportSession transportSession = session.getTransportSession();
- if(session.getLocalState() == EndpointState.CLOSED
- && transportSession.isLocalChannelSet())
- {
-
- int channel = transportSession.getLocalChannel();
- transportSession.freeLocalChannel();
- _localSessions[channel] = null;
+ End end = new End();
- End end = new End();
-
- int frameBytes = writeFrame(buffer, channel, end, null);
- written += frameBytes;
- }
+ int frameBytes = writeFrame(buffer, channel, end, null);
+ written += frameBytes;
+ endpoint.clearModified();
}
+
endpoint = endpoint.transportNext();
+
}
return written;
}
+ private boolean hasSendableMessages(SessionImpl session)
+ {
+
+ if(!_closeReceived && (session == null || !session.getTransportSession().endReceived()))
+ {
+ EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+ while(endpoint != null)
+ {
+ if(endpoint instanceof SenderImpl)
+ {
+ SenderImpl sender = (SenderImpl) endpoint;
+ if((session == null || sender.getSession() == session)
+ && sender.getQueued() != 0
+ && !getTransportState(sender).detachReceived())
+ {
+ return true;
+ }
+ }
+ endpoint = endpoint.transportNext();
+ }
+ }
+ return false;
+ }
+
private int processClose(WritableBuffer buffer)
{
if(_connectionEndpoint.getLocalState() == EndpointState.CLOSED && !_isCloseSent)
{
- Close close = new Close();
-
- // TODO - populate;
+ if(!hasSendableMessages(null))
+ {
+ Close close = new Close();
- _isCloseSent = true;
+ // TODO - populate;
- return writeFrame(buffer, 0, close, null);
+ _isCloseSent = true;
+ return writeFrame(buffer, 0, close, null);
+ }
}
return 0;
}
-
-
private int writeFrame(WritableBuffer buffer, int channel, DescribedType frameBody, ByteBuffer payload)
{
int oldPosition = buffer.position();
@@ -895,6 +937,7 @@ public class TransportImpl extends Endpo
if(transportLink != null)
{
LinkImpl link = transportLink.getLink();
+ transportLink.receivedDetach();
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
link.setRemoteState(EndpointState.CLOSED);
@@ -916,7 +959,7 @@ public class TransportImpl extends Endpo
else
{
_remoteSessions[channel] = null;
-
+ transportSession.receivedEnd();
transportSession.getSession().setRemoteState(EndpointState.CLOSED);
}
@@ -924,6 +967,7 @@ public class TransportImpl extends Endpo
public void handleClose(Close close, Binary payload, Integer channel)
{
+ _closeReceived = true;
_connectionEndpoint.setRemoteState(EndpointState.CLOSED);
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Fri Sep 28 10:18:29 2012
@@ -34,6 +34,8 @@ class TransportLink<T extends LinkImpl>
private T _link;
private UnsignedInteger _remoteDeliveryCount;
private UnsignedInteger _remoteLinkCredit;
+ private boolean _detachReceived;
+ private boolean _attachSent;
protected TransportLink(T link)
{
@@ -156,4 +158,24 @@ class TransportLink<T extends LinkImpl>
{
_deliveryCount = _deliveryCount.add(UnsignedInteger.ONE);
}
+
+ public void receivedDetach()
+ {
+ _detachReceived = true;
+ }
+
+ public boolean detachReceived()
+ {
+ return _detachReceived;
+ }
+
+ public boolean attachSent()
+ {
+ return _attachSent;
+ }
+
+ public void sentAttach()
+ {
+ _attachSent = true;
+ }
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Fri Sep 28 10:18:29 2012
@@ -58,6 +58,8 @@ class TransportSession
private int _unsettledIncomingSize;
private boolean _incomingWindowSizeChange;
private boolean _outgoingWindowSizeChange;
+ private boolean _endReceived;
+ private boolean _beginSent;
public TransportSession(SessionImpl session)
{
@@ -399,4 +401,24 @@ class TransportSession
{
_nextIncomingId = _nextIncomingId.add(UnsignedInteger.ONE);
}
+
+ public boolean endReceived()
+ {
+ return _endReceived;
+ }
+
+ public void receivedEnd()
+ {
+ _endReceived = true;
+ }
+
+ public boolean beginSent()
+ {
+ return _beginSent;
+ }
+
+ public void sentBegin()
+ {
+ _beginSent = true;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org