You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/09/28 12:18:30 UTC

svn commit: r1391385 - in /qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl: TransportImpl.java TransportLink.java TransportSession.java

Author: rgodfrey
Date: Fri Sep 28 10:18:29 2012
New Revision: 1391385

URL: http://svn.apache.org/viewvc?rev=1391385&view=rev
Log:
PROTON-37 : Fix failing python tests

Modified:
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Fri Sep 28 10:18:29 2012
@@ -91,6 +91,7 @@ public class TransportImpl extends Endpo
 
     private final ByteBuffer _overflowBuffer = ByteBuffer.wrap(new byte[_maxFrameSize]);
     private static final byte AMQP_FRAME_TYPE = 0;
+    private boolean _closeReceived;
 
 
     {
@@ -158,14 +159,10 @@ public class TransportImpl extends Endpo
             _overflowBuffer.flip();
         }
 
-        if(_overflowBuffer.position() == 0)
-        {
-            clearInterestList();
-            //clearTransportWorkList();
-        }
 
         return written;
-        }catch (RuntimeException e)
+        }
+        catch (RuntimeException e)
         {
             e.printStackTrace();
             throw e;
@@ -192,27 +189,33 @@ public class TransportImpl extends Endpo
 
             if(endpoint instanceof LinkImpl)
             {
-
                 LinkImpl link = (LinkImpl) endpoint;
                 TransportLink transportLink = getTransportState(link);
+                SessionImpl session = link.getSession();
+                TransportSession transportSession = getTransportState(session);
+
                 if(link.getLocalState() == EndpointState.CLOSED
                    && transportLink.isLocalHandleSet())
                 {
+                    if(!(link instanceof SenderImpl)
+                       || link.getQueued() == 0
+                       || transportLink.detachReceived()
+                       || transportSession.endReceived()
+                       || _closeReceived)
+                    {
+                        UnsignedInteger localHandle = transportLink.getLocalHandle();
+                        transportLink.clearLocalHandle();
+                        transportSession.freeLocalHandle(localHandle);
 
-                    SessionImpl session = link.getSession();
-                    TransportSession transportSession = getTransportState(session);
-                    UnsignedInteger localHandle = transportLink.getLocalHandle();
-                    transportLink.clearLocalHandle();
-                    transportSession.freeLocalHandle(localHandle);
-
-
-                    Detach detach = new Detach();
-                    detach.setHandle(localHandle);
 
+                        Detach detach = new Detach();
+                        detach.setHandle(localHandle);
 
-                    int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), detach, null);
-                    written += frameBytes;
 
+                        int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), detach, null);
+                        written += frameBytes;
+                        endpoint.clearModified();
+                    }
                 }
 
             }
@@ -251,6 +254,7 @@ public class TransportImpl extends Endpo
                     flow.setNextOutgoingId(transportSession.getNextOutgoingId());
                     int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), flow, null);
                     written += frameBytes;
+                    endpoint.clearModified();
                 }
 
             }
@@ -358,8 +362,6 @@ public class TransportImpl extends Endpo
                     transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
                 }
 
-                delivery.getLink().decrementQueued();
-
                 delivery = delivery.clearTransportWork();
 
 
@@ -448,7 +450,12 @@ public class TransportImpl extends Endpo
                         flow.setNextOutgoingId(transportSession.getNextOutgoingId());
                         int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), flow, null);
                         written += frameBytes;
+                        if(receiver.getLocalState() == EndpointState.ACTIVE)
+                        {
+                            endpoint.clearModified();
+                        }
                     }
+
                 }
             }
             endpoint = endpoint.transportNext();
@@ -487,15 +494,15 @@ public class TransportImpl extends Endpo
     {
         EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
         int written = 0;
+
         while(endpoint != null && buffer.remaining() >= _maxFrameSize)
         {
-
             if(endpoint instanceof LinkImpl)
             {
 
                 LinkImpl link = (LinkImpl) endpoint;
                 TransportLink transportLink = getTransportState(link);
-                if(link.getLocalState() == EndpointState.ACTIVE)
+                if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent())
                 {
 
                     if( (link.getRemoteState() == EndpointState.ACTIVE
@@ -539,10 +546,16 @@ public class TransportImpl extends Endpo
 
                         int frameBytes = writeFrame(buffer, transportSession.getLocalChannel(), attach, null);
                         written += frameBytes;
+                        transportLink.sentAttach();
+                        if(link.getLocalState() == EndpointState.ACTIVE && (link instanceof SenderImpl || !link.hasCredit()))
+                        {
+                            endpoint.clearModified();
+                        }
                     }
                 }
 
             }
+
             endpoint = endpoint.transportNext();
         }
         return written;
@@ -598,7 +611,7 @@ public class TransportImpl extends Endpo
             {
                 SessionImpl session = (SessionImpl) endpoint;
                 TransportSession transportSession = getTransportState(session);
-                if(session.getLocalState() == EndpointState.ACTIVE)
+                if(session.getLocalState() != EndpointState.UNINITIALIZED && !transportSession.beginSent())
                 {
                     int channelId = allocateLocalChannel();
                     transportSession.setLocalChannel(channelId);
@@ -617,6 +630,11 @@ public class TransportImpl extends Endpo
                     begin.setNextOutgoingId(transportSession.getNextOutgoingId());
 
                     written += writeFrame(buffer, channelId, begin, null);
+                    transportSession.sentBegin();
+                    if(session.getLocalState() == EndpointState.ACTIVE)
+                    {
+                        endpoint.clearModified();
+                    }
                 }
             }
             endpoint = endpoint.transportNext();
@@ -658,52 +676,76 @@ public class TransportImpl extends Endpo
         int written = 0;
         while(endpoint != null && buffer.remaining() >= _maxFrameSize)
         {
+            SessionImpl session;
+            TransportSession transportSession;
 
-            if(endpoint instanceof SessionImpl)
+            if((endpoint instanceof SessionImpl)
+               && (session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
+               && (transportSession = session.getTransportSession()).isLocalChannelSet()
+               && !hasSendableMessages(session))
             {
+                int channel = transportSession.getLocalChannel();
+                transportSession.freeLocalChannel();
+                _localSessions[channel] = null;
 
-                SessionImpl session = (SessionImpl) endpoint;
-                TransportSession transportSession = session.getTransportSession();
-                if(session.getLocalState() == EndpointState.CLOSED
-                   && transportSession.isLocalChannelSet())
-                {
-
-                    int channel = transportSession.getLocalChannel();
-                    transportSession.freeLocalChannel();
-                    _localSessions[channel] = null;
 
+                End end = new End();
 
-                    End end = new End();
-
-                    int frameBytes = writeFrame(buffer, channel, end, null);
-                    written += frameBytes;
-                }
+                int frameBytes = writeFrame(buffer, channel, end, null);
+                written += frameBytes;
+                endpoint.clearModified();
 
             }
+
             endpoint = endpoint.transportNext();
+
         }
         return written;
     }
 
+    private boolean hasSendableMessages(SessionImpl session)
+    {
+
+        if(!_closeReceived && (session == null || !session.getTransportSession().endReceived()))
+        {
+            EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+            while(endpoint != null)
+            {
+                if(endpoint instanceof SenderImpl)
+                {
+                    SenderImpl sender = (SenderImpl) endpoint;
+                    if((session == null || sender.getSession() == session)
+                       && sender.getQueued() != 0
+                        && !getTransportState(sender).detachReceived())
+                    {
+                        return true;
+                    }
+                }
+                endpoint = endpoint.transportNext();
+            }
+        }
+        return false;
+    }
+
     private int processClose(WritableBuffer buffer)
     {
         if(_connectionEndpoint.getLocalState() == EndpointState.CLOSED && !_isCloseSent)
         {
-            Close close = new Close();
-
-            // TODO - populate;
+            if(!hasSendableMessages(null))
+            {
+                Close close = new Close();
 
-            _isCloseSent = true;
+                // TODO - populate;
 
-            return  writeFrame(buffer, 0, close, null);
+                _isCloseSent = true;
 
+                return  writeFrame(buffer, 0, close, null);
+            }
         }
         return 0;
 
     }
 
-
-
     private int writeFrame(WritableBuffer buffer, int channel, DescribedType frameBody, ByteBuffer payload)
     {
         int oldPosition = buffer.position();
@@ -895,6 +937,7 @@ public class TransportImpl extends Endpo
             if(transportLink != null)
             {
                 LinkImpl link = transportLink.getLink();
+                transportLink.receivedDetach();
                 transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
                 link.setRemoteState(EndpointState.CLOSED);
 
@@ -916,7 +959,7 @@ public class TransportImpl extends Endpo
         else
         {
             _remoteSessions[channel] = null;
-
+            transportSession.receivedEnd();
             transportSession.getSession().setRemoteState(EndpointState.CLOSED);
 
         }
@@ -924,6 +967,7 @@ public class TransportImpl extends Endpo
 
     public void handleClose(Close close, Binary payload, Integer channel)
     {
+        _closeReceived = true;
         _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
     }
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Fri Sep 28 10:18:29 2012
@@ -34,6 +34,8 @@ class TransportLink<T extends LinkImpl>
     private T _link;
     private UnsignedInteger _remoteDeliveryCount;
     private UnsignedInteger _remoteLinkCredit;
+    private boolean _detachReceived;
+    private boolean _attachSent;
 
     protected TransportLink(T link)
     {
@@ -156,4 +158,24 @@ class TransportLink<T extends LinkImpl>
     {
         _deliveryCount = _deliveryCount.add(UnsignedInteger.ONE);
     }
+
+    public void receivedDetach()
+    {
+        _detachReceived = true;
+    }
+
+    public boolean detachReceived()
+    {
+        return _detachReceived;
+    }
+
+    public boolean attachSent()
+    {
+        return _attachSent;
+    }
+
+    public void sentAttach()
+    {
+        _attachSent = true;
+    }
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1391385&r1=1391384&r2=1391385&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Fri Sep 28 10:18:29 2012
@@ -58,6 +58,8 @@ class TransportSession
     private int _unsettledIncomingSize;
     private boolean _incomingWindowSizeChange;
     private boolean _outgoingWindowSizeChange;
+    private boolean _endReceived;
+    private boolean _beginSent;
 
     public TransportSession(SessionImpl session)
     {
@@ -399,4 +401,24 @@ class TransportSession
     {
         _nextIncomingId = _nextIncomingId.add(UnsignedInteger.ONE);
     }
+
+    public boolean endReceived()
+    {
+        return _endReceived;
+    }
+
+    public void receivedEnd()
+    {
+        _endReceived = true;
+    }
+
+    public boolean beginSent()
+    {
+        return _beginSent;
+    }
+
+    public void sentBegin()
+    {
+        _beginSent = true;
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org