You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/15 17:02:45 UTC

svn commit: r1739313 - /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: rgodfrey
Date: Fri Apr 15 15:02:45 2016
New Revision: 1739313

URL: http://svn.apache.org/viewvc?rev=1739313&view=rev
Log:
QPID-7202 : Merge AMQP 1.0 session related classes

Removed:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SessionEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SessionEventListener.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Apr 15 15:02:45 2016
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
@@ -98,11 +99,13 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -190,10 +193,10 @@ public class AMQPConnection_1_0 extends
     private SocketAddress _remoteAddress;
 
     // positioned by the *outgoing* channel
-    private SessionEndpoint[] _sendingSessions;
+    private Session_1_0[] _sendingSessions;
 
     // positioned by the *incoming* channel
-    private SessionEndpoint[] _receivingSessions;
+    private Session_1_0[] _receivingSessions;
     private boolean _closedForInput;
     private boolean _closedForOutput;
 
@@ -283,16 +286,27 @@ public class AMQPConnection_1_0 extends
     public void receiveAttach(final short channel, final Attach attach)
     {
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endPoint = getSession(channel);
-        if (endPoint != null)
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
         {
-            endPoint.receiveAttach(attach);
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveAttach(attach);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            // TODO - error
         }
     }
 
     public void receive(final short channel, final Object frame)
     {
-        List<Runnable> postLockActions;
         FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
         if (frame instanceof FrameBody)
         {
@@ -354,9 +368,17 @@ public class AMQPConnection_1_0 extends
     {
         Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
 
-        for(Session_1_0 session : sessions)
+        for(final Session_1_0 session : sessions)
         {
-            session.remoteEnd(new End());
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.remoteEnd(new End());
+                    return null;
+                }
+            }, session.getAccessControllerContext());
         }
     }
 
@@ -532,7 +554,7 @@ public class AMQPConnection_1_0 extends
     {
 
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endpoint = _receivingSessions[channel];
+        Session_1_0 endpoint = _receivingSessions[channel];
         if (endpoint != null)
         {
             _receivingSessions[channel] = null;
@@ -549,10 +571,22 @@ public class AMQPConnection_1_0 extends
                                    final Disposition disposition)
     {
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endPoint = getSession(channel);
-        if (endPoint != null)
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
         {
-            endPoint.receiveDisposition(disposition);
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveDisposition(disposition);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            // TODO - error
         }
 
     }
@@ -565,7 +599,7 @@ public class AMQPConnection_1_0 extends
         if (begin.getRemoteChannel() != null)
         {
             myChannelId = begin.getRemoteChannel().shortValue();
-            SessionEndpoint sessionEndpoint;
+            Session_1_0 sessionEndpoint;
             try
             {
                 sessionEndpoint = _sendingSessions[myChannelId];
@@ -626,7 +660,7 @@ public class AMQPConnection_1_0 extends
 
             if (_receivingSessions[channel] == null)
             {
-                SessionEndpoint sessionEndpoint = new SessionEndpoint(this, begin);
+                Session_1_0 sessionEndpoint = new Session_1_0(this, begin);
 
                 _receivingSessions[channel] = sessionEndpoint;
                 _sendingSessions[myChannelId] = sessionEndpoint;
@@ -655,44 +689,11 @@ public class AMQPConnection_1_0 extends
 
     }
 
-    private void remoteSessionCreation(final SessionEndpoint sessionEndpoint)
+    private void remoteSessionCreation(final Session_1_0 session)
     {
-        if(!_closedOnOpen)
-        {
-            final Session_1_0 session = new Session_1_0(this, sessionEndpoint);
-            _sessions.add(session);
-            sessionAdded(session);
-            sessionEndpoint.setSessionEventListener(new SessionEventListener()
-            {
-                @Override
-                public void remoteLinkCreation(final LinkEndpoint endpoint11)
-                {
-                    AccessController.doPrivileged(new PrivilegedAction<Object>()
-                    {
-                        @Override
-                        public Object run()
-                        {
-                            session.remoteLinkCreation(endpoint11);
-                            return null;
-                        }
-                    }, session.getAccessControllerContext());
-                }
+        _sessions.add(session);
+        sessionAdded(session);
 
-                @Override
-                public void remoteEnd(final End end)
-                {
-                    AccessController.doPrivileged(new PrivilegedAction<Object>()
-                    {
-                        @Override
-                        public Object run()
-                        {
-                            session.remoteEnd(end);
-                            return null;
-                        }
-                    }, session.getAccessControllerContext());
-                }
-            });
-        }
     }
 
     private short getFirstFreeChannel()
@@ -723,48 +724,44 @@ public class AMQPConnection_1_0 extends
     public void receiveTransfer(final short channel, final Transfer transfer)
     {
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endPoint = getSession(channel);
-        if (endPoint != null)
-        {
-            endPoint.receiveTransfer(transfer);
-        }
-
-    }
-
-    public SessionEndpoint createSession(final String name)
-    {
-        // todo assert connection state
-        short channel = getFirstFreeChannel();
-        if (channel != -1)
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
         {
-            SessionEndpoint endpoint = new SessionEndpoint(this);
-            _sendingSessions[channel] = endpoint;
-            endpoint.setSendingChannel(channel);
-            Begin begin = new Begin();
-            begin.setNextOutgoingId(endpoint.getNextOutgoingId());
-            begin.setOutgoingWindow(endpoint.getOutgoingWindowSize());
-            begin.setIncomingWindow(endpoint.getIncomingWindowSize());
-
-            begin.setHandleMax(_handleMax);
-            sendFrame(channel, begin);
-            return endpoint;
-
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveTransfer(transfer);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
         }
         else
         {
-            // TODO - report error
-            return null;
+            // TODO - error
         }
-
     }
 
     public void receiveFlow(final short channel, final Flow flow)
     {
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endPoint = getSession(channel);
-        if (endPoint != null)
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveFlow(flow);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
         {
-            endPoint.receiveFlow(flow);
+            // TODO - error
         }
 
     }
@@ -779,8 +776,8 @@ public class AMQPConnection_1_0 extends
                         : _channelMax;
         if (_receivingSessions == null)
         {
-            _receivingSessions = new SessionEndpoint[_channelMax + 1];
-            _sendingSessions = new SessionEndpoint[_channelMax + 1];
+            _receivingSessions = new Session_1_0[_channelMax + 1];
+            _sendingSessions = new Session_1_0[_channelMax + 1];
         }
         _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
         _remoteContainerId = open.getContainerId();
@@ -937,10 +934,22 @@ public class AMQPConnection_1_0 extends
     public void receiveDetach(final short channel, final Detach detach)
     {
         assertState(FrameReceivingState.ANY_FRAME);
-        SessionEndpoint endPoint = getSession(channel);
-        if (endPoint != null)
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveDetach(detach);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
         {
-            endPoint.receiveDetach(detach);
+            // TODO - error
         }
     }
 
@@ -1201,59 +1210,73 @@ public class AMQPConnection_1_0 extends
 
     public void received(final QpidByteBuffer msg)
     {
-        try
+
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
         {
-            updateLastReadTime();
-            if (RAW_LOGGER.isDebugEnabled())
+            @Override
+            public Object run()
             {
-                QpidByteBuffer dup = msg.duplicate();
-                byte[] data = new byte[dup.remaining()];
-                dup.get(data);
-                dup.dispose();
-                Binary bin = new Binary(data);
-                RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
-            }
+                updateLastReadTime();
+                try
+                {
+                    if (RAW_LOGGER.isDebugEnabled())
+                    {
+                        QpidByteBuffer dup = msg.duplicate();
+                        byte[] data = new byte[dup.remaining()];
+                        dup.get(data);
+                        dup.dispose();
+                        Binary bin = new Binary(data);
+                        RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+                    }
 
-            int remaining;
+                    int remaining;
 
-            do
-            {
-                remaining = msg.remaining();
+                    do
+                    {
+                        remaining = msg.remaining();
 
-                switch (_frameReceivingState)
-                {
-                    case AMQP_OR_SASL_HEADER:
-                    case AMQP_HEADER:
-                        if (remaining < 8)
+                        switch (_frameReceivingState)
                         {
-                            return;
+                            case AMQP_OR_SASL_HEADER:
+                            case AMQP_HEADER:
+                                if (remaining >= 8)
+                                {
+                                    processProtocolHeader(msg);
+                                }
+                                break;
+                            case OPEN_ONLY:
+                            case ANY_FRAME:
+                            case SASL_INIT_ONLY:
+                            case SASL_RESPONSE_ONLY:
+                                _frameHandler.parse(msg);
+                                break;
+                            case CLOSED:
+                                // ignore;
+                                break;
                         }
-                        processProtocolHeader(msg);
-                        break;
-                    case OPEN_ONLY:
-                    case ANY_FRAME:
-                    case SASL_INIT_ONLY:
-                    case SASL_RESPONSE_ONLY:
-                        _frameHandler.parse(msg);
-                        break;
-                    case CLOSED:
-                        // ignore;
-                        break;
-                }
 
 
+                    }
+                    while (msg.remaining() != remaining);
+                }
+                catch (IllegalArgumentException | IllegalStateException e)
+                {
+                    throw new ConnectionScopedRuntimeException(e);
+                }
+                catch (StoreException e)
+                {
+                    if (getVirtualHost().getState() == State.ACTIVE)
+                    {
+                        throw new ServerScopedRuntimeException(e);
+                    }
+                    else
+                    {
+                        throw new ConnectionScopedRuntimeException(e);
+                    }
+                }
+                return null;
             }
-            while (msg.remaining() != remaining);
-        }
-        catch (ConnectionScopedRuntimeException e)
-        {
-            throw e;
-        }
-        catch (RuntimeException e)
-        {
-            LOGGER.error("Unexpected exception while processing incoming data", e);
-            throw new ConnectionScopedRuntimeException("Unexpected exception while processing incoming data", e);
-        }
+        }, getAccessControllerContext());
 
     }
 
@@ -1544,8 +1567,8 @@ public class AMQPConnection_1_0 extends
 
         if (_receivingSessions == null)
         {
-            _receivingSessions = new SessionEndpoint[channelMax + 1];
-            _sendingSessions = new SessionEndpoint[channelMax + 1];
+            _receivingSessions = new Session_1_0[channelMax + 1];
+            _sendingSessions = new Session_1_0[channelMax + 1];
         }
         if (channelMax < _channelMax)
         {
@@ -1575,9 +1598,9 @@ public class AMQPConnection_1_0 extends
         _closedOnOpen = true;
     }
 
-    private SessionEndpoint getSession(final short channel)
+    private Session_1_0 getSession(final short channel)
     {
-        SessionEndpoint session = _receivingSessions[channel];
+        Session_1_0 session = _receivingSessions[channel];
         if (session == null)
         {
             Error error = new Error();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -68,7 +68,7 @@ public abstract class LinkEndpoint<T ext
 
     private final String _name;
 
-    private SessionEndpoint _session;
+    private Session_1_0 _session;
 
 
     private volatile State _state = State.DETACHED;
@@ -84,12 +84,12 @@ public abstract class LinkEndpoint<T ext
 
     private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
 
-    LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+    LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
     {
         this(sessionEndpoint, name, unsettled, null);
     }
 
-    LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
+    LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
     {
         _name = name;
         _session = sessionEndpoint;
@@ -99,7 +99,7 @@ public abstract class LinkEndpoint<T ext
         _deliveryStateHandler = deliveryStateHandler;
     }
 
-    LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach)
+    LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
     {
         _session = sessionEndpoint;
 
@@ -289,7 +289,7 @@ public abstract class LinkEndpoint<T ext
         return _state == State.DETACHED || _session.isEnded();
     }
 
-    public SessionEndpoint getSession()
+    public Session_1_0 getSession()
     {
         return _session;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -91,19 +91,19 @@ public class ReceivingLinkEndpoint exten
     private UnsignedInteger _drainLimit;
 
 
-    public ReceivingLinkEndpoint(final SessionEndpoint session, String name)
+    public ReceivingLinkEndpoint(final Session_1_0 session, String name)
     {
         this(session,name,null);
     }
 
-    public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap)
+    public ReceivingLinkEndpoint(final Session_1_0 session, String name, Map<Binary, Outcome> unsettledMap)
     {
         super(session, name, unsettledMap);
         setDeliveryCount(UnsignedInteger.valueOf(0));
         setLinkEventListener(ReceivingLinkListener.DEFAULT);
     }
 
-    public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach)
+    public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
     {
         super(session, attach);
         setDeliveryCount(attach.getInitialDeliveryCount());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -42,26 +42,26 @@ public class SendingLinkEndpoint extends
     private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
     private Binary _transactionId;
 
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+    public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name)
     {
         this(sessionEndpoint, name, null);
     }
 
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+    public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
     {
         super(sessionEndpoint, name, unsettled);
         init();
     }
 
 
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled,
+    public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled,
                                DeliveryStateHandler deliveryStateHandler)
     {
         super(sessionEndpoint, name, unsettled, deliveryStateHandler);
         init();
     }
 
-    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+    public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, final Attach attach)
     {
         super(sessionEndpoint, attach);
         setSendingSettlementMode(attach.getSndSettleMode());
@@ -83,7 +83,7 @@ public class SendingLinkEndpoint extends
 
     public boolean transfer(final Transfer xfr, final boolean decrementCredit)
     {
-        SessionEndpoint s = getSession();
+        Session_1_0 s = getSession();
         xfr.setMessageFormat(UnsignedInteger.ZERO);
         if(decrementCredit)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Apr 15 15:02:45 2016
@@ -44,10 +44,15 @@ import javax.security.auth.Subject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
@@ -58,10 +63,15 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.protocol.AMQConstant;
@@ -89,11 +99,10 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.transport.network.Ticker;
 
-public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0>, LogSubject
+public class Session_1_0 implements AMQSessionModel<Session_1_0>, LogSubject
 {
     private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
-    private final SessionEndpoint _endpoint;
     private final AccessControlContext _accessControllerContext;
     private AutoCommitTransaction _transaction;
 
@@ -115,16 +124,615 @@ public class Session_1_0 implements Sess
     private Session<?> _modelObject;
     private final List<ConsumerTarget_1_0> _consumersWithPendingWork = new ArrayList<>();
 
+    private SessionState _state ;
 
-    public Session_1_0(final AMQPConnection_1_0 connection, final SessionEndpoint endpoint)
+    private final Map<String, LinkEndpoint> _linkMap = new HashMap<>();
+    private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
+    private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
+    private long _lastAttachedTime;
+
+    private short _receivingChannel;
+    private short _sendingChannel;
+
+
+    // has to be a power of two
+    private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
+    private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
+
+
+
+    private int _nextOutgoingDeliveryId;
+
+    private UnsignedInteger _outgoingSessionCredit;
+    private UnsignedInteger _initialOutgoingId = UnsignedInteger.valueOf(0);
+    private SequenceNumber _nextIncomingTransferId;
+    private SequenceNumber _nextOutgoingTransferId = new SequenceNumber(_initialOutgoingId.intValue());
+
+    private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
+    private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
+
+    private int _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+    private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+    private UnsignedInteger _lastSentIncomingLimit;
+
+    private final Error _sessionEndedLinkError =
+            new Error(LinkError.DETACH_FORCED,
+                      "Force detach the link because the session is remotely ended.");
+
+
+    public Session_1_0(final AMQPConnection_1_0 connection)
+    {
+        this(connection, SessionState.INACTIVE, null);
+    }
+
+    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin)
+    {
+        this(connection, SessionState.BEGIN_RECVD, new SequenceNumber(begin.getNextOutgoingId().intValue()));
+    }
+
+
+    private Session_1_0(final AMQPConnection_1_0 connection, SessionState state, SequenceNumber nextIncomingId)
     {
-        _endpoint = endpoint;
+
+        _state = state;
+        _nextIncomingTransferId = nextIncomingId;
         _connection = connection;
         _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
         _subject.getPrincipals().add(new SessionPrincipal(this));
         _accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject);
     }
 
+    public void setReceivingChannel(final short receivingChannel)
+    {
+        _receivingChannel = receivingChannel;
+        switch(_state)
+        {
+            case INACTIVE:
+                _state = SessionState.BEGIN_RECVD;
+                break;
+            case BEGIN_SENT:
+                _state = SessionState.ACTIVE;
+                break;
+            case END_PIPE:
+                _state = SessionState.END_SENT;
+                break;
+            default:
+                // TODO error
+
+        }
+    }
+
+    public void sendDetach(final Detach detach)
+    {
+        send(detach);
+    }
+
+    public void receiveAttach(final Attach attach)
+    {
+        if(_state == SessionState.ACTIVE)
+        {
+            UnsignedInteger handle = attach.getHandle();
+            if(_remoteLinkEndpoints.containsKey(handle))
+            {
+                // TODO - Error - handle busy?
+            }
+            else
+            {
+                LinkEndpoint endpoint = _linkMap.get(attach.getName());
+                if(endpoint == null)
+                {
+                    endpoint = attach.getRole() == Role.RECEIVER
+                               ? new SendingLinkEndpoint(this, attach)
+                               : new ReceivingLinkEndpoint(this, attach);
+
+                    // TODO : fix below - distinguish between local and remote owned
+                    endpoint.setSource(attach.getSource());
+                    endpoint.setTarget(attach.getTarget());
+
+
+                }
+
+                if(attach.getRole() == Role.SENDER)
+                {
+                    endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
+                }
+
+                _remoteLinkEndpoints.put(handle, endpoint);
+
+                if(!_localLinkEndpoints.containsKey(endpoint))
+                {
+                    UnsignedInteger localHandle = findNextAvailableHandle();
+                    endpoint.setLocalHandle(localHandle);
+                    _localLinkEndpoints.put(endpoint, localHandle);
+
+                    remoteLinkCreation(endpoint);
+
+                }
+                else
+                {
+                    endpoint.receiveAttach(attach);
+                }
+            }
+        }
+    }
+
+    public void updateDisposition(final Role role,
+                                  final UnsignedInteger first,
+                                  final UnsignedInteger last,
+                                  final DeliveryState state, final boolean settled)
+    {
+
+
+        Disposition disposition = new Disposition();
+        disposition.setRole(role);
+        disposition.setFirst(first);
+        disposition.setLast(last);
+        disposition.setSettled(settled);
+
+        disposition.setState(state);
+
+
+        if(settled)
+        {
+            if(role == Role.RECEIVER)
+            {
+                SequenceNumber pos = new SequenceNumber(first.intValue());
+                SequenceNumber end = new SequenceNumber(last.intValue());
+                while(pos.compareTo(end)<=0)
+                {
+                    Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
+
+/*
+                    _availableIncomingCredit += d.getTransfers().size();
+*/
+
+                    pos.incr();
+                }
+            }
+        }
+
+        send(disposition);
+        //TODO - check send flow
+    }
+
+    public boolean hasCreditToSend()
+    {
+        boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
+        boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+        return b && b1;
+    }
+
+    public void end()
+    {
+        end(new End());
+    }
+
+    public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
+    {
+        _nextOutgoingTransferId.incr();
+        UnsignedInteger deliveryId;
+        if(newDelivery)
+        {
+            deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+            endpoint.setLastDeliveryId(deliveryId);
+        }
+        else
+        {
+            deliveryId = endpoint.getLastDeliveryId();
+        }
+        xfr.setDeliveryId(deliveryId);
+
+        if(!Boolean.TRUE.equals(xfr.getSettled()))
+        {
+            Delivery delivery;
+            if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+            {
+                delivery = new Delivery(xfr, endpoint);
+                _outgoingUnsettled.put(deliveryId, delivery);
+
+            }
+            else
+            {
+                delivery.addTransfer(xfr);
+            }
+            _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+            endpoint.addUnsettled(delivery);
+
+        }
+
+        try
+        {
+            QpidByteBuffer payload = xfr.getPayload();
+            int payloadSent = _connection.sendFrame(getSendingChannel(), xfr, payload);
+
+            if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
+            {
+                payload = payload.duplicate();
+                try
+                {
+                    payload.position(payload.position()+payloadSent);
+
+                    Transfer secondTransfer = new Transfer();
+
+                    secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+                    secondTransfer.setHandle(xfr.getHandle());
+                    secondTransfer.setSettled(xfr.getSettled());
+                    secondTransfer.setState(xfr.getState());
+                    secondTransfer.setMessageFormat(xfr.getMessageFormat());
+                    secondTransfer.setPayload(payload);
+
+                    sendTransfer(secondTransfer, endpoint, false);
+                }
+                finally
+                {
+                    payload.dispose();
+                }
+
+            }
+        }
+        catch(OversizeFrameException e)
+        {
+            e.printStackTrace();
+            throw new ConnectionScopedRuntimeException(e);
+        }
+    }
+
+    public boolean isActive()
+    {
+        return _state == SessionState.ACTIVE;
+    }
+
+    public void receiveEnd(final End end)
+    {
+        switch (_state)
+        {
+            case END_SENT:
+                _state = SessionState.ENDED;
+                break;
+            case ACTIVE:
+                detachLinks();
+                remoteEnd(end);
+                short sendChannel = getSendingChannel();
+                _connection.sendEnd(sendChannel, new End(), true);
+                _state = SessionState.ENDED;
+                break;
+            default:
+                sendChannel = getSendingChannel();
+                End reply = new End();
+                Error error = new Error();
+                error.setCondition(AmqpError.ILLEGAL_STATE);
+                error.setDescription("END called on Session which has not been opened");
+                reply.setError(error);
+                _connection.sendEnd(sendChannel, reply, true);
+                break;
+
+
+        }
+
+    }
+
+    public UnsignedInteger getNextOutgoingId()
+    {
+        return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+    }
+
+    public void sendFlowConditional()
+    {
+        if(_nextIncomingTransferId != null)
+        {
+            UnsignedInteger clientsCredit =
+                    _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+            int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
+            if (i >= 0)
+            {
+                sendFlow();
+            }
+        }
+
+    }
+
+    public UnsignedInteger getOutgoingWindowSize()
+    {
+        return UnsignedInteger.valueOf(_availableOutgoingCredit);
+    }
+
+    public void receiveFlow(final Flow flow)
+    {
+        UnsignedInteger handle = flow.getHandle();
+        final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+
+        final UnsignedInteger nextOutgoingId =
+                flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+        int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+        _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+
+        if (endpoint != null)
+        {
+            endpoint.receiveFlow(flow);
+        }
+        else
+        {
+            final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+            for (LinkEndpoint le : allLinkEndpoints)
+            {
+                le.flowStateChanged();
+            }
+        }
+    }
+
+    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+    {
+        _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+
+    }
+
+    public short getSendingChannel()
+    {
+        return _sendingChannel;
+    }
+
+    public void receiveDisposition(final Disposition disposition)
+    {
+        Role dispositionRole = disposition.getRole();
+
+        LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+
+        if(dispositionRole == Role.RECEIVER)
+        {
+            unsettledTransfers = _outgoingUnsettled;
+        }
+        else
+        {
+            unsettledTransfers = _incomingUnsettled;
+
+        }
+
+        UnsignedInteger deliveryId = disposition.getFirst();
+        UnsignedInteger last = disposition.getLast();
+        if(last == null)
+        {
+            last = deliveryId;
+        }
+
+
+        while(deliveryId.compareTo(last)<=0)
+        {
+
+            Delivery delivery = unsettledTransfers.get(deliveryId);
+            if(delivery != null)
+            {
+                delivery.getLinkEndpoint().receiveDeliveryState(delivery,
+                                                           disposition.getState(),
+                                                           disposition.getSettled());
+            }
+            deliveryId = deliveryId.add(UnsignedInteger.ONE);
+        }
+        if(disposition.getSettled())
+        {
+            //TODO - check send flow
+        }
+
+    }
+
+    public SessionState getState()
+    {
+        return _state;
+    }
+
+    public void sendFlow()
+    {
+        sendFlow(new Flow());
+    }
+
+    public void setSendingChannel(final short sendingChannel)
+    {
+        _sendingChannel = sendingChannel;
+        switch(_state)
+        {
+            case INACTIVE:
+                _state = SessionState.BEGIN_SENT;
+                break;
+            case BEGIN_RECVD:
+                _state = SessionState.ACTIVE;
+                break;
+            default:
+                // TODO error
+
+        }
+    }
+
+    public void sendFlow(final Flow flow)
+    {
+        if(_nextIncomingTransferId != null)
+        {
+            final int nextIncomingId = _nextIncomingTransferId.intValue();
+            flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
+            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
+        }
+        flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
+
+        flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+        send(flow);
+    }
+
+    public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+    {
+        _outgoingSessionCredit = outgoingSessionCredit;
+    }
+
+    public void receiveDetach(final Detach detach)
+    {
+        UnsignedInteger handle = detach.getHandle();
+        detach(handle, detach);
+    }
+
+    public void sendAttach(final Attach attach)
+    {
+        send(attach);
+    }
+
+    private void send(final FrameBody frameBody)
+    {
+        _connection.sendFrame(getSendingChannel(), frameBody);
+    }
+
+    public boolean isSyntheticError(final Error error)
+    {
+        return error == _sessionEndedLinkError;
+    }
+
+    public void end(final End end)
+    {
+        switch (_state)
+        {
+            case BEGIN_SENT:
+                _connection.sendEnd(getSendingChannel(), end, false);
+                _state = SessionState.END_PIPE;
+                break;
+            case ACTIVE:
+                detachLinks();
+                short sendChannel = getSendingChannel();
+                _connection.sendEnd(sendChannel, end, true);
+                _state = SessionState.END_SENT;
+                break;
+            default:
+                sendChannel = getSendingChannel();
+                End reply = new End();
+                Error error = new Error();
+                error.setCondition(AmqpError.ILLEGAL_STATE);
+                error.setDescription("END called on Session which has not been opened");
+                reply.setError(error);
+                _connection.sendEnd(sendChannel, reply, true);
+                break;
+
+
+        }
+    }
+
+    public void receiveTransfer(final Transfer transfer)
+    {
+        _nextIncomingTransferId.incr();
+    /*
+                _availableIncomingCredit--;
+    */
+
+        UnsignedInteger handle = transfer.getHandle();
+
+
+        LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+
+        if (endpoint == null)
+        {
+            //TODO - error unknown link
+            System.err.println("Unknown endpoint " + transfer);
+
+        }
+
+        UnsignedInteger deliveryId = transfer.getDeliveryId();
+        if (deliveryId == null)
+        {
+            deliveryId = ((ReceivingLinkEndpoint) endpoint).getLastDeliveryId();
+        }
+
+        Delivery delivery = _incomingUnsettled.get(deliveryId);
+        if (delivery == null)
+        {
+            delivery = new Delivery(transfer, endpoint);
+            _incomingUnsettled.put(deliveryId, delivery);
+            if (delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+            {
+/*
+                    _availableIncomingCredit++;
+*/
+            }
+
+            if (Boolean.TRUE.equals(transfer.getMore()))
+            {
+                ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(transfer.getDeliveryId());
+            }
+        }
+        else
+        {
+            if (delivery.getDeliveryId().equals(deliveryId))
+            {
+                delivery.addTransfer(transfer);
+                if (delivery.isSettled())
+                {
+/*
+                        _availableIncomingCredit++;
+*/
+                }
+                else if (Boolean.TRUE.equals(transfer.getAborted()))
+                {
+/*
+                        _availableIncomingCredit += delivery.getTransfers().size();
+*/
+                }
+
+                if (!Boolean.TRUE.equals(transfer.getMore()))
+                {
+                    ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(null);
+                }
+            }
+            else
+            {
+                // TODO - error
+                System.err.println("Incorrect transfer id " + transfer);
+            }
+        }
+
+        if (endpoint != null)
+        {
+            endpoint.receiveTransfer(transfer, delivery);
+        }
+
+        if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+        {
+            _incomingUnsettled.remove(deliveryId);
+        }
+
+    }
+
+    public Collection<LinkEndpoint> getLocalLinkEndpoints()
+    {
+        return new ArrayList<>(_localLinkEndpoints.keySet());
+    }
+
+    public boolean isEnded()
+    {
+        return _state == SessionState.ENDED || _connection.isClosed();
+    }
+
+    public void settle(final Role role, final UnsignedInteger deliveryId)
+    {
+        if(role == Role.RECEIVER)
+        {
+            Delivery d = _incomingUnsettled.remove(deliveryId);
+            if(d != null)
+            {
+/*
+                _availableIncomingCredit += d.getTransfers().size();
+*/
+            }
+        }
+        else
+        {
+            Delivery d = _outgoingUnsettled.remove(deliveryId);
+/*            if(d != null)
+            {
+                _availableOutgoingCredit += d.getTransfers().size();
+
+            }*/
+        }
+
+    }
+
+    public UnsignedInteger getIncomingWindowSize()
+    {
+        return UnsignedInteger.valueOf(_availableIncomingCredit);
+    }
+
     public AccessControlContext getAccessControllerContext()
     {
         return _accessControllerContext;
@@ -132,44 +740,46 @@ public class Session_1_0 implements Sess
 
     public void remoteLinkCreation(final LinkEndpoint endpoint)
     {
-
         Destination destination;
         Link_1_0 link = null;
         Error error = null;
 
-        final
-        LinkRegistry
-                linkRegistry = getVirtualHost().getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+        final LinkRegistry linkRegistry = getVirtualHost().getLinkRegistry(getConnection().getRemoteContainerId());
 
 
-        if(endpoint.getRole() == Role.SENDER)
+        if (endpoint.getRole() == Role.SENDER)
         {
 
-            final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+            final SendingLink_1_0 previousLink =
+                    (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
 
-            if(previousLink == null)
+            if (previousLink == null)
             {
 
                 Target target = (Target) endpoint.getTarget();
                 Source source = (Source) endpoint.getSource();
 
 
-                if(source != null)
+                if (source != null)
                 {
-                    if(Boolean.TRUE.equals(source.getDynamic()))
+                    if (Boolean.TRUE.equals(source.getDynamic()))
                     {
                         Queue<?> tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    if(!addr.startsWith("/") && addr.contains("/"))
+                    if (!addr.startsWith("/") && addr.contains("/"))
                     {
-                        String[] parts = addr.split("/",2);
-                        Exchange<?> exchg = getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
-                        if(exchg != null)
+                        String[] parts = addr.split("/", 2);
+                        Exchange<?> exchg =
+                                getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
+                        if (exchg != null)
                         {
                             ExchangeDestination exchangeDestination =
-                                    new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy(), parts[0]);
+                                    new ExchangeDestination(exchg,
+                                                            source.getDurable(),
+                                                            source.getExpiryPolicy(),
+                                                            parts[0]);
                             exchangeDestination.setInitialRoutingAddress(parts[1]);
                             destination = exchangeDestination;
 
@@ -183,16 +793,20 @@ public class Session_1_0 implements Sess
                     else
                     {
                         MessageSource queue = getVirtualHost().getAttainedMessageSource(addr);
-                        if(queue != null)
+                        if (queue != null)
                         {
                             destination = new MessageSourceDestination(queue);
                         }
                         else
                         {
-                            Exchange<?> exchg = getVirtualHost().getAttainedChildFromAddress(Exchange.class, addr);
-                            if(exchg != null)
+                            Exchange<?> exchg =
+                                    getVirtualHost().getAttainedChildFromAddress(Exchange.class, addr);
+                            if (exchg != null)
                             {
-                                destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy(), addr);
+                                destination = new ExchangeDestination(exchg,
+                                                                      source.getDurable(),
+                                                                      source.getExpiryPolicy(),
+                                                                      addr);
                             }
                             else
                             {
@@ -208,26 +822,29 @@ public class Session_1_0 implements Sess
                     destination = null;
                 }
 
-                if(destination != null)
+                if (destination != null)
                 {
                     final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
                     try
                     {
-                        final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
-                                                                                getVirtualHost(),
-                                                                                (SendingDestination) destination
-                        );
+                        final SendingLink_1_0 sendingLink =
+                                new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+                                                    getVirtualHost(),
+                                                    (SendingDestination) destination
+                                );
 
-                        sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+                        sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(
+                                sendingLink));
                         registerConsumer(sendingLink);
 
                         link = sendingLink;
-                        if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+                        if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
+                            || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
                         {
                             linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
                         }
                     }
-                    catch(AmqpErrorException e)
+                    catch (AmqpErrorException e)
                     {
                         _logger.error("Error creating sending link", e);
                         destination = null;
@@ -242,10 +859,10 @@ public class Session_1_0 implements Sess
 
                 Source oldSource = (Source) previousLink.getEndpoint().getSource();
                 final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
-                if(newSourceDurable != null)
+                if (newSourceDurable != null)
                 {
                     oldSource.setDurable(newSourceDurable);
-                    if(newSourceDurable.equals(TerminusDurability.NONE))
+                    if (newSourceDurable.equals(TerminusDurability.NONE))
                     {
                         linkRegistry.unregisterSendingLink(endpoint.getName());
                     }
@@ -262,21 +879,21 @@ public class Session_1_0 implements Sess
         }
         else
         {
-            if(endpoint.getTarget() instanceof Coordinator)
+            if (endpoint.getTarget() instanceof Coordinator)
             {
                 Coordinator coordinator = (Coordinator) endpoint.getTarget();
                 TxnCapability[] capabilities = coordinator.getCapabilities();
                 boolean localTxn = false;
                 boolean multiplePerSession = false;
-                if(capabilities != null)
+                if (capabilities != null)
                 {
-                    for(TxnCapability capability : capabilities)
+                    for (TxnCapability capability : capabilities)
                     {
-                        if(capability.equals(TxnCapability.LOCAL_TXN))
+                        if (capability.equals(TxnCapability.LOCAL_TXN))
                         {
                             localTxn = true;
                         }
-                        else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+                        else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
                         {
                             multiplePerSession = true;
                         }
@@ -297,8 +914,12 @@ public class Session_1_0 implements Sess
 
                 final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
                 final TxnCoordinatorLink_1_0 coordinatorLink =
-                        new TxnCoordinatorLink_1_0(getVirtualHost(), this, receivingLinkEndpoint, _openTransactions);
-                receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
+                        new TxnCoordinatorLink_1_0(getVirtualHost(),
+                                                   this,
+                                                   receivingLinkEndpoint,
+                                                   _openTransactions);
+                receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
+                        coordinatorLink));
                 link = coordinatorLink;
 
 
@@ -309,14 +930,14 @@ public class Session_1_0 implements Sess
                 ReceivingLink_1_0 previousLink =
                         (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
 
-                if(previousLink == null)
+                if (previousLink == null)
                 {
 
                     Target target = (Target) endpoint.getTarget();
 
-                    if(target != null)
+                    if (target != null)
                     {
-                        if(Boolean.TRUE.equals(target.getDynamic()))
+                        if (Boolean.TRUE.equals(target.getDynamic()))
                         {
 
                             Queue<?> tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
@@ -324,17 +945,18 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        if(addr == null || "".equals(addr.trim()))
+                        if (addr == null || "".equals(addr.trim()))
                         {
                             MessageDestination messageDestination = getVirtualHost().getDefaultDestination();
                             destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
                                                                        target.getExpiryPolicy(), "");
                         }
-                        else if(!addr.startsWith("/") && addr.contains("/"))
+                        else if (!addr.startsWith("/") && addr.contains("/"))
                         {
-                            String[] parts = addr.split("/",2);
-                            Exchange<?> exchange = getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
-                            if(exchange != null)
+                            String[] parts = addr.split("/", 2);
+                            Exchange<?> exchange =
+                                    getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
+                            if (exchange != null)
                             {
                                 ExchangeDestination exchangeDestination =
                                         new ExchangeDestination(exchange,
@@ -355,16 +977,19 @@ public class Session_1_0 implements Sess
                         }
                         else
                         {
-                            MessageDestination messageDestination = getVirtualHost().getAttainedMessageDestination(addr);
-                            if(messageDestination != null)
+                            MessageDestination messageDestination =
+                                    getVirtualHost().getAttainedMessageDestination(addr);
+                            if (messageDestination != null)
                             {
-                                destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                           target.getExpiryPolicy(), addr);
+                                destination =
+                                        new NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                                     target.getExpiryPolicy(), addr);
                             }
                             else
                             {
-                                Queue<?> queue = getVirtualHost().getAttainedChildFromAddress(Queue.class, addr);
-                                if(queue != null)
+                                Queue<?> queue =
+                                        getVirtualHost().getAttainedChildFromAddress(Queue.class, addr);
+                                if (queue != null)
                                 {
 
                                     destination = new QueueDestination(queue, addr);
@@ -383,18 +1008,20 @@ public class Session_1_0 implements Sess
                     {
                         destination = null;
                     }
-                    if(destination != null)
+                    if (destination != null)
                     {
                         final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                        final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
-                                                                                      getVirtualHost(),
-                                (ReceivingDestination) destination);
+                        final ReceivingLink_1_0 receivingLink =
+                                new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+                                                      getVirtualHost(),
+                                                      (ReceivingDestination) destination);
 
-                        receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
+                        receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
+                                receivingLink));
 
                         link = receivingLink;
-                        if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
-                           || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
+                        if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+                            || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
                         {
                             linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
                         }
@@ -414,9 +1041,9 @@ public class Session_1_0 implements Sess
 
         endpoint.attach();
 
-        if(link == null)
+        if (link == null)
         {
-            if(error == null)
+            if (error == null)
             {
                 error = new Error();
                 error.setCondition(AmqpError.NOT_FOUND);
@@ -429,6 +1056,7 @@ public class Session_1_0 implements Sess
         }
     }
 
+
     private void registerConsumer(final SendingLink_1_0 link)
     {
         ConsumerImpl consumer = link.getConsumer();
@@ -529,7 +1157,7 @@ public class Session_1_0 implements Sess
             iter.remove();
         }
 
-        for(LinkEndpoint linkEndpoint : _endpoint.getLocalLinkEndpoints())
+        for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints())
         {
             linkEndpoint.remoteDetached(new Detach());
         }
@@ -599,7 +1227,7 @@ public class Session_1_0 implements Sess
     public void close()
     {
         performCloseTasks();
-        _endpoint.end();
+        end();
         if(_modelObject != null)
         {
             _modelObject.delete();
@@ -630,7 +1258,7 @@ public class Session_1_0 implements Sess
         theError.setDescription(message);
         theError.setCondition(ConnectionError.CONNECTION_FORCED);
         end.setError(theError);
-        _endpoint.end(end);
+        end(end);
     }
 
     @Override
@@ -734,7 +1362,7 @@ public class Session_1_0 implements Sess
     @Override
     public int getChannelId()
     {
-        return _endpoint.getSendingChannel();
+        return getSendingChannel();
     }
 
     @Override
@@ -757,7 +1385,7 @@ public class Session_1_0 implements Sess
                                     authorizedPrincipal,
                                     remoteAddress,
                                     getVirtualHost().getName(),
-                                    _endpoint.getSendingChannel())  + "] ";
+                                    getSendingChannel())  + "] ";
     }
 
     @Override
@@ -1059,6 +1687,55 @@ public class Session_1_0 implements Sess
     @Override
     public String toString()
     {
-        return "Session_1_0[" + _connection + ": " + _endpoint.getSendingChannel() + ']';
+        return "Session_1_0[" + _connection + ": " + getSendingChannel() + ']';
     }
+
+
+    private void detach(UnsignedInteger handle, Detach detach)
+    {
+        if(_remoteLinkEndpoints.containsKey(handle))
+        {
+            LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
+
+            endpoint.remoteDetached(detach);
+
+            _localLinkEndpoints.remove(endpoint);
+
+
+        }
+        else
+        {
+            // TODO
+        }
+    }
+
+    private void detachLinks()
+    {
+        Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
+        for(UnsignedInteger handle : handles)
+        {
+            Detach detach = new Detach();
+            detach.setClosed(false);
+            detach.setHandle(handle);
+            detach.setError(_sessionEndedLinkError);
+            detach(handle, detach);
+        }
+    }
+
+
+    private UnsignedInteger findNextAvailableHandle()
+    {
+        int i = 0;
+        do
+        {
+            if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
+            {
+                return UnsignedInteger.valueOf(i);
+            }
+        } while(++i != 0);
+
+        // TODO
+        throw new RuntimeException();
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org