You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/15 17:02:45 UTC
svn commit: r1739313 -
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: rgodfrey
Date: Fri Apr 15 15:02:45 2016
New Revision: 1739313
URL: http://svn.apache.org/viewvc?rev=1739313&view=rev
Log:
QPID-7202 : Merge AMQP 1.0 session related classes
Removed:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SessionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SessionEventListener.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Apr 15 15:02:45 2016
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
@@ -98,11 +99,13 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -190,10 +193,10 @@ public class AMQPConnection_1_0 extends
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
- private SessionEndpoint[] _sendingSessions;
+ private Session_1_0[] _sendingSessions;
// positioned by the *incoming* channel
- private SessionEndpoint[] _receivingSessions;
+ private Session_1_0[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
@@ -283,16 +286,27 @@ public class AMQPConnection_1_0 extends
public void receiveAttach(final short channel, final Attach attach)
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endPoint = getSession(channel);
- if (endPoint != null)
+ final Session_1_0 session = getSession(channel);
+ if (session != null)
{
- endPoint.receiveAttach(attach);
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.receiveAttach(attach);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+ else
+ {
+ // TODO - error
}
}
public void receive(final short channel, final Object frame)
{
- List<Runnable> postLockActions;
FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
if (frame instanceof FrameBody)
{
@@ -354,9 +368,17 @@ public class AMQPConnection_1_0 extends
{
Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
- for(Session_1_0 session : sessions)
+ for(final Session_1_0 session : sessions)
{
- session.remoteEnd(new End());
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteEnd(new End());
+ return null;
+ }
+ }, session.getAccessControllerContext());
}
}
@@ -532,7 +554,7 @@ public class AMQPConnection_1_0 extends
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endpoint = _receivingSessions[channel];
+ Session_1_0 endpoint = _receivingSessions[channel];
if (endpoint != null)
{
_receivingSessions[channel] = null;
@@ -549,10 +571,22 @@ public class AMQPConnection_1_0 extends
final Disposition disposition)
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endPoint = getSession(channel);
- if (endPoint != null)
+ final Session_1_0 session = getSession(channel);
+ if (session != null)
{
- endPoint.receiveDisposition(disposition);
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.receiveDisposition(disposition);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+ else
+ {
+ // TODO - error
}
}
@@ -565,7 +599,7 @@ public class AMQPConnection_1_0 extends
if (begin.getRemoteChannel() != null)
{
myChannelId = begin.getRemoteChannel().shortValue();
- SessionEndpoint sessionEndpoint;
+ Session_1_0 sessionEndpoint;
try
{
sessionEndpoint = _sendingSessions[myChannelId];
@@ -626,7 +660,7 @@ public class AMQPConnection_1_0 extends
if (_receivingSessions[channel] == null)
{
- SessionEndpoint sessionEndpoint = new SessionEndpoint(this, begin);
+ Session_1_0 sessionEndpoint = new Session_1_0(this, begin);
_receivingSessions[channel] = sessionEndpoint;
_sendingSessions[myChannelId] = sessionEndpoint;
@@ -655,44 +689,11 @@ public class AMQPConnection_1_0 extends
}
- private void remoteSessionCreation(final SessionEndpoint sessionEndpoint)
+ private void remoteSessionCreation(final Session_1_0 session)
{
- if(!_closedOnOpen)
- {
- final Session_1_0 session = new Session_1_0(this, sessionEndpoint);
- _sessions.add(session);
- sessionAdded(session);
- sessionEndpoint.setSessionEventListener(new SessionEventListener()
- {
- @Override
- public void remoteLinkCreation(final LinkEndpoint endpoint11)
- {
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.remoteLinkCreation(endpoint11);
- return null;
- }
- }, session.getAccessControllerContext());
- }
+ _sessions.add(session);
+ sessionAdded(session);
- @Override
- public void remoteEnd(final End end)
- {
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- session.remoteEnd(end);
- return null;
- }
- }, session.getAccessControllerContext());
- }
- });
- }
}
private short getFirstFreeChannel()
@@ -723,48 +724,44 @@ public class AMQPConnection_1_0 extends
public void receiveTransfer(final short channel, final Transfer transfer)
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endPoint = getSession(channel);
- if (endPoint != null)
- {
- endPoint.receiveTransfer(transfer);
- }
-
- }
-
- public SessionEndpoint createSession(final String name)
- {
- // todo assert connection state
- short channel = getFirstFreeChannel();
- if (channel != -1)
+ final Session_1_0 session = getSession(channel);
+ if (session != null)
{
- SessionEndpoint endpoint = new SessionEndpoint(this);
- _sendingSessions[channel] = endpoint;
- endpoint.setSendingChannel(channel);
- Begin begin = new Begin();
- begin.setNextOutgoingId(endpoint.getNextOutgoingId());
- begin.setOutgoingWindow(endpoint.getOutgoingWindowSize());
- begin.setIncomingWindow(endpoint.getIncomingWindowSize());
-
- begin.setHandleMax(_handleMax);
- sendFrame(channel, begin);
- return endpoint;
-
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.receiveTransfer(transfer);
+ return null;
+ }
+ }, session.getAccessControllerContext());
}
else
{
- // TODO - report error
- return null;
+ // TODO - error
}
-
}
public void receiveFlow(final short channel, final Flow flow)
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endPoint = getSession(channel);
- if (endPoint != null)
+ final Session_1_0 session = getSession(channel);
+ if (session != null)
+ {
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.receiveFlow(flow);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+ else
{
- endPoint.receiveFlow(flow);
+ // TODO - error
}
}
@@ -779,8 +776,8 @@ public class AMQPConnection_1_0 extends
: _channelMax;
if (_receivingSessions == null)
{
- _receivingSessions = new SessionEndpoint[_channelMax + 1];
- _sendingSessions = new SessionEndpoint[_channelMax + 1];
+ _receivingSessions = new Session_1_0[_channelMax + 1];
+ _sendingSessions = new Session_1_0[_channelMax + 1];
}
_maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
_remoteContainerId = open.getContainerId();
@@ -937,10 +934,22 @@ public class AMQPConnection_1_0 extends
public void receiveDetach(final short channel, final Detach detach)
{
assertState(FrameReceivingState.ANY_FRAME);
- SessionEndpoint endPoint = getSession(channel);
- if (endPoint != null)
+ final Session_1_0 session = getSession(channel);
+ if (session != null)
+ {
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.receiveDetach(detach);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+ else
{
- endPoint.receiveDetach(detach);
+ // TODO - error
}
}
@@ -1201,59 +1210,73 @@ public class AMQPConnection_1_0 extends
public void received(final QpidByteBuffer msg)
{
- try
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
{
- updateLastReadTime();
- if (RAW_LOGGER.isDebugEnabled())
+ @Override
+ public Object run()
{
- QpidByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- dup.dispose();
- Binary bin = new Binary(data);
- RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
- }
+ updateLastReadTime();
+ try
+ {
+ if (RAW_LOGGER.isDebugEnabled())
+ {
+ QpidByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ dup.dispose();
+ Binary bin = new Binary(data);
+ RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
+ }
- int remaining;
+ int remaining;
- do
- {
- remaining = msg.remaining();
+ do
+ {
+ remaining = msg.remaining();
- switch (_frameReceivingState)
- {
- case AMQP_OR_SASL_HEADER:
- case AMQP_HEADER:
- if (remaining < 8)
+ switch (_frameReceivingState)
{
- return;
+ case AMQP_OR_SASL_HEADER:
+ case AMQP_HEADER:
+ if (remaining >= 8)
+ {
+ processProtocolHeader(msg);
+ }
+ break;
+ case OPEN_ONLY:
+ case ANY_FRAME:
+ case SASL_INIT_ONLY:
+ case SASL_RESPONSE_ONLY:
+ _frameHandler.parse(msg);
+ break;
+ case CLOSED:
+ // ignore;
+ break;
}
- processProtocolHeader(msg);
- break;
- case OPEN_ONLY:
- case ANY_FRAME:
- case SASL_INIT_ONLY:
- case SASL_RESPONSE_ONLY:
- _frameHandler.parse(msg);
- break;
- case CLOSED:
- // ignore;
- break;
- }
+ }
+ while (msg.remaining() != remaining);
+ }
+ catch (IllegalArgumentException | IllegalStateException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ catch (StoreException e)
+ {
+ if (getVirtualHost().getState() == State.ACTIVE)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+ return null;
}
- while (msg.remaining() != remaining);
- }
- catch (ConnectionScopedRuntimeException e)
- {
- throw e;
- }
- catch (RuntimeException e)
- {
- LOGGER.error("Unexpected exception while processing incoming data", e);
- throw new ConnectionScopedRuntimeException("Unexpected exception while processing incoming data", e);
- }
+ }, getAccessControllerContext());
}
@@ -1544,8 +1567,8 @@ public class AMQPConnection_1_0 extends
if (_receivingSessions == null)
{
- _receivingSessions = new SessionEndpoint[channelMax + 1];
- _sendingSessions = new SessionEndpoint[channelMax + 1];
+ _receivingSessions = new Session_1_0[channelMax + 1];
+ _sendingSessions = new Session_1_0[channelMax + 1];
}
if (channelMax < _channelMax)
{
@@ -1575,9 +1598,9 @@ public class AMQPConnection_1_0 extends
_closedOnOpen = true;
}
- private SessionEndpoint getSession(final short channel)
+ private Session_1_0 getSession(final short channel)
{
- SessionEndpoint session = _receivingSessions[channel];
+ Session_1_0 session = _receivingSessions[channel];
if (session == null)
{
Error error = new Error();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -68,7 +68,7 @@ public abstract class LinkEndpoint<T ext
private final String _name;
- private SessionEndpoint _session;
+ private Session_1_0 _session;
private volatile State _state = State.DETACHED;
@@ -84,12 +84,12 @@ public abstract class LinkEndpoint<T ext
private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
- LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+ LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
{
this(sessionEndpoint, name, unsettled, null);
}
- LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
+ LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
{
_name = name;
_session = sessionEndpoint;
@@ -99,7 +99,7 @@ public abstract class LinkEndpoint<T ext
_deliveryStateHandler = deliveryStateHandler;
}
- LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach)
+ LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
{
_session = sessionEndpoint;
@@ -289,7 +289,7 @@ public abstract class LinkEndpoint<T ext
return _state == State.DETACHED || _session.isEnded();
}
- public SessionEndpoint getSession()
+ public Session_1_0 getSession()
{
return _session;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -91,19 +91,19 @@ public class ReceivingLinkEndpoint exten
private UnsignedInteger _drainLimit;
- public ReceivingLinkEndpoint(final SessionEndpoint session, String name)
+ public ReceivingLinkEndpoint(final Session_1_0 session, String name)
{
this(session,name,null);
}
- public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap)
+ public ReceivingLinkEndpoint(final Session_1_0 session, String name, Map<Binary, Outcome> unsettledMap)
{
super(session, name, unsettledMap);
setDeliveryCount(UnsignedInteger.valueOf(0));
setLinkEventListener(ReceivingLinkListener.DEFAULT);
}
- public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach)
+ public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
{
super(session, attach);
setDeliveryCount(attach.getInitialDeliveryCount());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Apr 15 15:02:45 2016
@@ -42,26 +42,26 @@ public class SendingLinkEndpoint extends
private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
private Binary _transactionId;
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+ public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name)
{
this(sessionEndpoint, name, null);
}
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+ public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
{
super(sessionEndpoint, name, unsettled);
init();
}
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled,
+ public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled,
DeliveryStateHandler deliveryStateHandler)
{
super(sessionEndpoint, name, unsettled, deliveryStateHandler);
init();
}
- public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+ public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, final Attach attach)
{
super(sessionEndpoint, attach);
setSendingSettlementMode(attach.getSndSettleMode());
@@ -83,7 +83,7 @@ public class SendingLinkEndpoint extends
public boolean transfer(final Transfer xfr, final boolean decrementCredit)
{
- SessionEndpoint s = getSession();
+ Session_1_0 s = getSession();
xfr.setMessageFormat(UnsignedInteger.ZERO);
if(decrementCredit)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1739313&r1=1739312&r2=1739313&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Apr 15 15:02:45 2016
@@ -44,10 +44,15 @@ import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
@@ -58,10 +63,15 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.protocol.AMQConstant;
@@ -89,11 +99,10 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.transport.network.Ticker;
-public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0>, LogSubject
+public class Session_1_0 implements AMQSessionModel<Session_1_0>, LogSubject
{
private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
- private final SessionEndpoint _endpoint;
private final AccessControlContext _accessControllerContext;
private AutoCommitTransaction _transaction;
@@ -115,16 +124,615 @@ public class Session_1_0 implements Sess
private Session<?> _modelObject;
private final List<ConsumerTarget_1_0> _consumersWithPendingWork = new ArrayList<>();
+ private SessionState _state ;
- public Session_1_0(final AMQPConnection_1_0 connection, final SessionEndpoint endpoint)
+ private final Map<String, LinkEndpoint> _linkMap = new HashMap<>();
+ private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
+ private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
+ private long _lastAttachedTime;
+
+ private short _receivingChannel;
+ private short _sendingChannel;
+
+
+ // has to be a power of two
+ private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
+ private static final int BUFFER_SIZE_MASK = DEFAULT_SESSION_BUFFER_SIZE - 1;
+
+
+
+ private int _nextOutgoingDeliveryId;
+
+ private UnsignedInteger _outgoingSessionCredit;
+ private UnsignedInteger _initialOutgoingId = UnsignedInteger.valueOf(0);
+ private SequenceNumber _nextIncomingTransferId;
+ private SequenceNumber _nextOutgoingTransferId = new SequenceNumber(_initialOutgoingId.intValue());
+
+ private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
+ private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
+
+ private int _availableIncomingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+ private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
+ private UnsignedInteger _lastSentIncomingLimit;
+
+ private final Error _sessionEndedLinkError =
+ new Error(LinkError.DETACH_FORCED,
+ "Force detach the link because the session is remotely ended.");
+
+
+ public Session_1_0(final AMQPConnection_1_0 connection)
+ {
+ this(connection, SessionState.INACTIVE, null);
+ }
+
+ public Session_1_0(final AMQPConnection_1_0 connection, Begin begin)
+ {
+ this(connection, SessionState.BEGIN_RECVD, new SequenceNumber(begin.getNextOutgoingId().intValue()));
+ }
+
+
+ private Session_1_0(final AMQPConnection_1_0 connection, SessionState state, SequenceNumber nextIncomingId)
{
- _endpoint = endpoint;
+
+ _state = state;
+ _nextIncomingTransferId = nextIncomingId;
_connection = connection;
_subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
_subject.getPrincipals().add(new SessionPrincipal(this));
_accessControllerContext = org.apache.qpid.server.security.SecurityManager.getAccessControlContextFromSubject(_subject);
}
+ public void setReceivingChannel(final short receivingChannel)
+ {
+ _receivingChannel = receivingChannel;
+ switch(_state)
+ {
+ case INACTIVE:
+ _state = SessionState.BEGIN_RECVD;
+ break;
+ case BEGIN_SENT:
+ _state = SessionState.ACTIVE;
+ break;
+ case END_PIPE:
+ _state = SessionState.END_SENT;
+ break;
+ default:
+ // TODO error
+
+ }
+ }
+
+ public void sendDetach(final Detach detach)
+ {
+ send(detach);
+ }
+
+ public void receiveAttach(final Attach attach)
+ {
+ if(_state == SessionState.ACTIVE)
+ {
+ UnsignedInteger handle = attach.getHandle();
+ if(_remoteLinkEndpoints.containsKey(handle))
+ {
+ // TODO - Error - handle busy?
+ }
+ else
+ {
+ LinkEndpoint endpoint = _linkMap.get(attach.getName());
+ if(endpoint == null)
+ {
+ endpoint = attach.getRole() == Role.RECEIVER
+ ? new SendingLinkEndpoint(this, attach)
+ : new ReceivingLinkEndpoint(this, attach);
+
+ // TODO : fix below - distinguish between local and remote owned
+ endpoint.setSource(attach.getSource());
+ endpoint.setTarget(attach.getTarget());
+
+
+ }
+
+ if(attach.getRole() == Role.SENDER)
+ {
+ endpoint.setDeliveryCount(attach.getInitialDeliveryCount());
+ }
+
+ _remoteLinkEndpoints.put(handle, endpoint);
+
+ if(!_localLinkEndpoints.containsKey(endpoint))
+ {
+ UnsignedInteger localHandle = findNextAvailableHandle();
+ endpoint.setLocalHandle(localHandle);
+ _localLinkEndpoints.put(endpoint, localHandle);
+
+ remoteLinkCreation(endpoint);
+
+ }
+ else
+ {
+ endpoint.receiveAttach(attach);
+ }
+ }
+ }
+ }
+
+ public void updateDisposition(final Role role,
+ final UnsignedInteger first,
+ final UnsignedInteger last,
+ final DeliveryState state, final boolean settled)
+ {
+
+
+ Disposition disposition = new Disposition();
+ disposition.setRole(role);
+ disposition.setFirst(first);
+ disposition.setLast(last);
+ disposition.setSettled(settled);
+
+ disposition.setState(state);
+
+
+ if(settled)
+ {
+ if(role == Role.RECEIVER)
+ {
+ SequenceNumber pos = new SequenceNumber(first.intValue());
+ SequenceNumber end = new SequenceNumber(last.intValue());
+ while(pos.compareTo(end)<=0)
+ {
+ Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
+
+/*
+ _availableIncomingCredit += d.getTransfers().size();
+*/
+
+ pos.incr();
+ }
+ }
+ }
+
+ send(disposition);
+ //TODO - check send flow
+ }
+
+ public boolean hasCreditToSend()
+ {
+ boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
+ boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+ return b && b1;
+ }
+
+ public void end()
+ {
+ end(new End());
+ }
+
+ public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
+ {
+ _nextOutgoingTransferId.incr();
+ UnsignedInteger deliveryId;
+ if(newDelivery)
+ {
+ deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
+ endpoint.setLastDeliveryId(deliveryId);
+ }
+ else
+ {
+ deliveryId = endpoint.getLastDeliveryId();
+ }
+ xfr.setDeliveryId(deliveryId);
+
+ if(!Boolean.TRUE.equals(xfr.getSettled()))
+ {
+ Delivery delivery;
+ if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+ {
+ delivery = new Delivery(xfr, endpoint);
+ _outgoingUnsettled.put(deliveryId, delivery);
+
+ }
+ else
+ {
+ delivery.addTransfer(xfr);
+ }
+ _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ endpoint.addUnsettled(delivery);
+
+ }
+
+ try
+ {
+ QpidByteBuffer payload = xfr.getPayload();
+ int payloadSent = _connection.sendFrame(getSendingChannel(), xfr, payload);
+
+ if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
+ {
+ payload = payload.duplicate();
+ try
+ {
+ payload.position(payload.position()+payloadSent);
+
+ Transfer secondTransfer = new Transfer();
+
+ secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+ secondTransfer.setHandle(xfr.getHandle());
+ secondTransfer.setSettled(xfr.getSettled());
+ secondTransfer.setState(xfr.getState());
+ secondTransfer.setMessageFormat(xfr.getMessageFormat());
+ secondTransfer.setPayload(payload);
+
+ sendTransfer(secondTransfer, endpoint, false);
+ }
+ finally
+ {
+ payload.dispose();
+ }
+
+ }
+ }
+ catch(OversizeFrameException e)
+ {
+ e.printStackTrace();
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+
+ public boolean isActive()
+ {
+ return _state == SessionState.ACTIVE;
+ }
+
+ public void receiveEnd(final End end)
+ {
+ switch (_state)
+ {
+ case END_SENT:
+ _state = SessionState.ENDED;
+ break;
+ case ACTIVE:
+ detachLinks();
+ remoteEnd(end);
+ short sendChannel = getSendingChannel();
+ _connection.sendEnd(sendChannel, new End(), true);
+ _state = SessionState.ENDED;
+ break;
+ default:
+ sendChannel = getSendingChannel();
+ End reply = new End();
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("END called on Session which has not been opened");
+ reply.setError(error);
+ _connection.sendEnd(sendChannel, reply, true);
+ break;
+
+
+ }
+
+ }
+
+ public UnsignedInteger getNextOutgoingId()
+ {
+ return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+ }
+
+ public void sendFlowConditional()
+ {
+ if(_nextIncomingTransferId != null)
+ {
+ UnsignedInteger clientsCredit =
+ _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+ int i = UnsignedInteger.valueOf(_availableIncomingCredit).subtract(clientsCredit).compareTo(clientsCredit);
+ if (i >= 0)
+ {
+ sendFlow();
+ }
+ }
+
+ }
+
+ public UnsignedInteger getOutgoingWindowSize()
+ {
+ return UnsignedInteger.valueOf(_availableOutgoingCredit);
+ }
+
+ public void receiveFlow(final Flow flow)
+ {
+ UnsignedInteger handle = flow.getHandle();
+ final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+
+ final UnsignedInteger nextOutgoingId =
+ flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+ int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+ _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+
+ if (endpoint != null)
+ {
+ endpoint.receiveFlow(flow);
+ }
+ else
+ {
+ final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+ for (LinkEndpoint le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
+ }
+ }
+ }
+
+ public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+ {
+ _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+
+ }
+
+ public short getSendingChannel()
+ {
+ return _sendingChannel;
+ }
+
+ public void receiveDisposition(final Disposition disposition)
+ {
+ Role dispositionRole = disposition.getRole();
+
+ LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+
+ if(dispositionRole == Role.RECEIVER)
+ {
+ unsettledTransfers = _outgoingUnsettled;
+ }
+ else
+ {
+ unsettledTransfers = _incomingUnsettled;
+
+ }
+
+ UnsignedInteger deliveryId = disposition.getFirst();
+ UnsignedInteger last = disposition.getLast();
+ if(last == null)
+ {
+ last = deliveryId;
+ }
+
+
+ while(deliveryId.compareTo(last)<=0)
+ {
+
+ Delivery delivery = unsettledTransfers.get(deliveryId);
+ if(delivery != null)
+ {
+ delivery.getLinkEndpoint().receiveDeliveryState(delivery,
+ disposition.getState(),
+ disposition.getSettled());
+ }
+ deliveryId = deliveryId.add(UnsignedInteger.ONE);
+ }
+ if(disposition.getSettled())
+ {
+ //TODO - check send flow
+ }
+
+ }
+
+ public SessionState getState()
+ {
+ return _state;
+ }
+
+ public void sendFlow()
+ {
+ sendFlow(new Flow());
+ }
+
+ public void setSendingChannel(final short sendingChannel)
+ {
+ _sendingChannel = sendingChannel;
+ switch(_state)
+ {
+ case INACTIVE:
+ _state = SessionState.BEGIN_SENT;
+ break;
+ case BEGIN_RECVD:
+ _state = SessionState.ACTIVE;
+ break;
+ default:
+ // TODO error
+
+ }
+ }
+
+ public void sendFlow(final Flow flow)
+ {
+ if(_nextIncomingTransferId != null)
+ {
+ final int nextIncomingId = _nextIncomingTransferId.intValue();
+ flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
+ _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _availableIncomingCredit);
+ }
+ flow.setIncomingWindow(UnsignedInteger.valueOf(_availableIncomingCredit));
+
+ flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+ send(flow);
+ }
+
+ public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+ {
+ _outgoingSessionCredit = outgoingSessionCredit;
+ }
+
+ public void receiveDetach(final Detach detach)
+ {
+ UnsignedInteger handle = detach.getHandle();
+ detach(handle, detach);
+ }
+
+ public void sendAttach(final Attach attach)
+ {
+ send(attach);
+ }
+
+ private void send(final FrameBody frameBody)
+ {
+ _connection.sendFrame(getSendingChannel(), frameBody);
+ }
+
+ public boolean isSyntheticError(final Error error)
+ {
+ return error == _sessionEndedLinkError;
+ }
+
+ public void end(final End end)
+ {
+ switch (_state)
+ {
+ case BEGIN_SENT:
+ _connection.sendEnd(getSendingChannel(), end, false);
+ _state = SessionState.END_PIPE;
+ break;
+ case ACTIVE:
+ detachLinks();
+ short sendChannel = getSendingChannel();
+ _connection.sendEnd(sendChannel, end, true);
+ _state = SessionState.END_SENT;
+ break;
+ default:
+ sendChannel = getSendingChannel();
+ End reply = new End();
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("END called on Session which has not been opened");
+ reply.setError(error);
+ _connection.sendEnd(sendChannel, reply, true);
+ break;
+
+
+ }
+ }
+
+ public void receiveTransfer(final Transfer transfer)
+ {
+ _nextIncomingTransferId.incr();
+ /*
+ _availableIncomingCredit--;
+ */
+
+ UnsignedInteger handle = transfer.getHandle();
+
+
+ LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+
+ if (endpoint == null)
+ {
+ //TODO - error unknown link
+ System.err.println("Unknown endpoint " + transfer);
+
+ }
+
+ UnsignedInteger deliveryId = transfer.getDeliveryId();
+ if (deliveryId == null)
+ {
+ deliveryId = ((ReceivingLinkEndpoint) endpoint).getLastDeliveryId();
+ }
+
+ Delivery delivery = _incomingUnsettled.get(deliveryId);
+ if (delivery == null)
+ {
+ delivery = new Delivery(transfer, endpoint);
+ _incomingUnsettled.put(deliveryId, delivery);
+ if (delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+ {
+/*
+ _availableIncomingCredit++;
+*/
+ }
+
+ if (Boolean.TRUE.equals(transfer.getMore()))
+ {
+ ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(transfer.getDeliveryId());
+ }
+ }
+ else
+ {
+ if (delivery.getDeliveryId().equals(deliveryId))
+ {
+ delivery.addTransfer(transfer);
+ if (delivery.isSettled())
+ {
+/*
+ _availableIncomingCredit++;
+*/
+ }
+ else if (Boolean.TRUE.equals(transfer.getAborted()))
+ {
+/*
+ _availableIncomingCredit += delivery.getTransfers().size();
+*/
+ }
+
+ if (!Boolean.TRUE.equals(transfer.getMore()))
+ {
+ ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(null);
+ }
+ }
+ else
+ {
+ // TODO - error
+ System.err.println("Incorrect transfer id " + transfer);
+ }
+ }
+
+ if (endpoint != null)
+ {
+ endpoint.receiveTransfer(transfer, delivery);
+ }
+
+ if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+ {
+ _incomingUnsettled.remove(deliveryId);
+ }
+
+ }
+
+ public Collection<LinkEndpoint> getLocalLinkEndpoints()
+ {
+ return new ArrayList<>(_localLinkEndpoints.keySet());
+ }
+
+ public boolean isEnded()
+ {
+ return _state == SessionState.ENDED || _connection.isClosed();
+ }
+
+ public void settle(final Role role, final UnsignedInteger deliveryId)
+ {
+ if(role == Role.RECEIVER)
+ {
+ Delivery d = _incomingUnsettled.remove(deliveryId);
+ if(d != null)
+ {
+/*
+ _availableIncomingCredit += d.getTransfers().size();
+*/
+ }
+ }
+ else
+ {
+ Delivery d = _outgoingUnsettled.remove(deliveryId);
+/* if(d != null)
+ {
+ _availableOutgoingCredit += d.getTransfers().size();
+
+ }*/
+ }
+
+ }
+
+ public UnsignedInteger getIncomingWindowSize()
+ {
+ return UnsignedInteger.valueOf(_availableIncomingCredit);
+ }
+
public AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
@@ -132,44 +740,46 @@ public class Session_1_0 implements Sess
public void remoteLinkCreation(final LinkEndpoint endpoint)
{
-
Destination destination;
Link_1_0 link = null;
Error error = null;
- final
- LinkRegistry
- linkRegistry = getVirtualHost().getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+ final LinkRegistry linkRegistry = getVirtualHost().getLinkRegistry(getConnection().getRemoteContainerId());
- if(endpoint.getRole() == Role.SENDER)
+ if (endpoint.getRole() == Role.SENDER)
{
- final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+ final SendingLink_1_0 previousLink =
+ (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
- if(previousLink == null)
+ if (previousLink == null)
{
Target target = (Target) endpoint.getTarget();
Source source = (Source) endpoint.getSource();
- if(source != null)
+ if (source != null)
{
- if(Boolean.TRUE.equals(source.getDynamic()))
+ if (Boolean.TRUE.equals(source.getDynamic()))
{
Queue<?> tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- if(!addr.startsWith("/") && addr.contains("/"))
+ if (!addr.startsWith("/") && addr.contains("/"))
{
- String[] parts = addr.split("/",2);
- Exchange<?> exchg = getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
- if(exchg != null)
+ String[] parts = addr.split("/", 2);
+ Exchange<?> exchg =
+ getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
+ if (exchg != null)
{
ExchangeDestination exchangeDestination =
- new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy(), parts[0]);
+ new ExchangeDestination(exchg,
+ source.getDurable(),
+ source.getExpiryPolicy(),
+ parts[0]);
exchangeDestination.setInitialRoutingAddress(parts[1]);
destination = exchangeDestination;
@@ -183,16 +793,20 @@ public class Session_1_0 implements Sess
else
{
MessageSource queue = getVirtualHost().getAttainedMessageSource(addr);
- if(queue != null)
+ if (queue != null)
{
destination = new MessageSourceDestination(queue);
}
else
{
- Exchange<?> exchg = getVirtualHost().getAttainedChildFromAddress(Exchange.class, addr);
- if(exchg != null)
+ Exchange<?> exchg =
+ getVirtualHost().getAttainedChildFromAddress(Exchange.class, addr);
+ if (exchg != null)
{
- destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy(), addr);
+ destination = new ExchangeDestination(exchg,
+ source.getDurable(),
+ source.getExpiryPolicy(),
+ addr);
}
else
{
@@ -208,26 +822,29 @@ public class Session_1_0 implements Sess
destination = null;
}
- if(destination != null)
+ if (destination != null)
{
final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
try
{
- final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- getVirtualHost(),
- (SendingDestination) destination
- );
+ final SendingLink_1_0 sendingLink =
+ new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+ getVirtualHost(),
+ (SendingDestination) destination
+ );
- sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+ sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(
+ sendingLink));
registerConsumer(sendingLink);
link = sendingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+ if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
}
- catch(AmqpErrorException e)
+ catch (AmqpErrorException e)
{
_logger.error("Error creating sending link", e);
destination = null;
@@ -242,10 +859,10 @@ public class Session_1_0 implements Sess
Source oldSource = (Source) previousLink.getEndpoint().getSource();
final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
- if(newSourceDurable != null)
+ if (newSourceDurable != null)
{
oldSource.setDurable(newSourceDurable);
- if(newSourceDurable.equals(TerminusDurability.NONE))
+ if (newSourceDurable.equals(TerminusDurability.NONE))
{
linkRegistry.unregisterSendingLink(endpoint.getName());
}
@@ -262,21 +879,21 @@ public class Session_1_0 implements Sess
}
else
{
- if(endpoint.getTarget() instanceof Coordinator)
+ if (endpoint.getTarget() instanceof Coordinator)
{
Coordinator coordinator = (Coordinator) endpoint.getTarget();
TxnCapability[] capabilities = coordinator.getCapabilities();
boolean localTxn = false;
boolean multiplePerSession = false;
- if(capabilities != null)
+ if (capabilities != null)
{
- for(TxnCapability capability : capabilities)
+ for (TxnCapability capability : capabilities)
{
- if(capability.equals(TxnCapability.LOCAL_TXN))
+ if (capability.equals(TxnCapability.LOCAL_TXN))
{
localTxn = true;
}
- else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+ else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
{
multiplePerSession = true;
}
@@ -297,8 +914,12 @@ public class Session_1_0 implements Sess
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
- new TxnCoordinatorLink_1_0(getVirtualHost(), this, receivingLinkEndpoint, _openTransactions);
- receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
+ new TxnCoordinatorLink_1_0(getVirtualHost(),
+ this,
+ receivingLinkEndpoint,
+ _openTransactions);
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
+ coordinatorLink));
link = coordinatorLink;
@@ -309,14 +930,14 @@ public class Session_1_0 implements Sess
ReceivingLink_1_0 previousLink =
(ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
- if(previousLink == null)
+ if (previousLink == null)
{
Target target = (Target) endpoint.getTarget();
- if(target != null)
+ if (target != null)
{
- if(Boolean.TRUE.equals(target.getDynamic()))
+ if (Boolean.TRUE.equals(target.getDynamic()))
{
Queue<?> tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
@@ -324,17 +945,18 @@ public class Session_1_0 implements Sess
}
String addr = target.getAddress();
- if(addr == null || "".equals(addr.trim()))
+ if (addr == null || "".equals(addr.trim()))
{
MessageDestination messageDestination = getVirtualHost().getDefaultDestination();
destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
target.getExpiryPolicy(), "");
}
- else if(!addr.startsWith("/") && addr.contains("/"))
+ else if (!addr.startsWith("/") && addr.contains("/"))
{
- String[] parts = addr.split("/",2);
- Exchange<?> exchange = getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
- if(exchange != null)
+ String[] parts = addr.split("/", 2);
+ Exchange<?> exchange =
+ getVirtualHost().getAttainedChildFromAddress(Exchange.class, parts[0]);
+ if (exchange != null)
{
ExchangeDestination exchangeDestination =
new ExchangeDestination(exchange,
@@ -355,16 +977,19 @@ public class Session_1_0 implements Sess
}
else
{
- MessageDestination messageDestination = getVirtualHost().getAttainedMessageDestination(addr);
- if(messageDestination != null)
+ MessageDestination messageDestination =
+ getVirtualHost().getAttainedMessageDestination(addr);
+ if (messageDestination != null)
{
- destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy(), addr);
+ destination =
+ new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy(), addr);
}
else
{
- Queue<?> queue = getVirtualHost().getAttainedChildFromAddress(Queue.class, addr);
- if(queue != null)
+ Queue<?> queue =
+ getVirtualHost().getAttainedChildFromAddress(Queue.class, addr);
+ if (queue != null)
{
destination = new QueueDestination(queue, addr);
@@ -383,18 +1008,20 @@ public class Session_1_0 implements Sess
{
destination = null;
}
- if(destination != null)
+ if (destination != null)
{
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
- getVirtualHost(),
- (ReceivingDestination) destination);
+ final ReceivingLink_1_0 receivingLink =
+ new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+ getVirtualHost(),
+ (ReceivingDestination) destination);
- receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
+ receivingLink));
link = receivingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
- || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
+ if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
{
linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}
@@ -414,9 +1041,9 @@ public class Session_1_0 implements Sess
endpoint.attach();
- if(link == null)
+ if (link == null)
{
- if(error == null)
+ if (error == null)
{
error = new Error();
error.setCondition(AmqpError.NOT_FOUND);
@@ -429,6 +1056,7 @@ public class Session_1_0 implements Sess
}
}
+
private void registerConsumer(final SendingLink_1_0 link)
{
ConsumerImpl consumer = link.getConsumer();
@@ -529,7 +1157,7 @@ public class Session_1_0 implements Sess
iter.remove();
}
- for(LinkEndpoint linkEndpoint : _endpoint.getLocalLinkEndpoints())
+ for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints())
{
linkEndpoint.remoteDetached(new Detach());
}
@@ -599,7 +1227,7 @@ public class Session_1_0 implements Sess
public void close()
{
performCloseTasks();
- _endpoint.end();
+ end();
if(_modelObject != null)
{
_modelObject.delete();
@@ -630,7 +1258,7 @@ public class Session_1_0 implements Sess
theError.setDescription(message);
theError.setCondition(ConnectionError.CONNECTION_FORCED);
end.setError(theError);
- _endpoint.end(end);
+ end(end);
}
@Override
@@ -734,7 +1362,7 @@ public class Session_1_0 implements Sess
@Override
public int getChannelId()
{
- return _endpoint.getSendingChannel();
+ return getSendingChannel();
}
@Override
@@ -757,7 +1385,7 @@ public class Session_1_0 implements Sess
authorizedPrincipal,
remoteAddress,
getVirtualHost().getName(),
- _endpoint.getSendingChannel()) + "] ";
+ getSendingChannel()) + "] ";
}
@Override
@@ -1059,6 +1687,55 @@ public class Session_1_0 implements Sess
@Override
public String toString()
{
- return "Session_1_0[" + _connection + ": " + _endpoint.getSendingChannel() + ']';
+ return "Session_1_0[" + _connection + ": " + getSendingChannel() + ']';
}
+
+
+ private void detach(UnsignedInteger handle, Detach detach)
+ {
+ if(_remoteLinkEndpoints.containsKey(handle))
+ {
+ LinkEndpoint endpoint = _remoteLinkEndpoints.remove(handle);
+
+ endpoint.remoteDetached(detach);
+
+ _localLinkEndpoints.remove(endpoint);
+
+
+ }
+ else
+ {
+ // TODO
+ }
+ }
+
+ private void detachLinks()
+ {
+ Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
+ for(UnsignedInteger handle : handles)
+ {
+ Detach detach = new Detach();
+ detach.setClosed(false);
+ detach.setHandle(handle);
+ detach.setError(_sessionEndedLinkError);
+ detach(handle, detach);
+ }
+ }
+
+
+ private UnsignedInteger findNextAvailableHandle()
+ {
+ int i = 0;
+ do
+ {
+ if(!_localLinkEndpoints.containsValue(UnsignedInteger.valueOf(i)))
+ {
+ return UnsignedInteger.valueOf(i);
+ }
+ } while(++i != 0);
+
+ // TODO
+ throw new RuntimeException();
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org