You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/17 20:56:45 UTC
svn commit: r1739636 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/test/java/org/apache/qpid/server/consumer/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
bro...
Author: rgodfrey
Date: Sun Apr 17 18:56:45 2016
New Revision: 1739636
URL: http://svn.apache.org/viewvc?rev=1739636&view=rev
Log:
QPID-7202 : Tidyups and remove one copy from delivery path
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sun Apr 17 18:56:45 2016
@@ -40,17 +40,15 @@ import org.apache.qpid.transport.network
*/
public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T>
{
- public UUID getId();
+ UUID getId();
- public AMQPConnection<?> getAMQPConnection();
+ AMQPConnection<?> getAMQPConnection();
- public String getClientID();
+ void close();
- public void close();
+ void close(AMQConstant cause, String message);
- public void close(AMQConstant cause, String message);
-
- public LogSubject getLogSubject();
+ LogSubject getLogSubject();
/**
* This method is called from the housekeeping thread to check the status of
@@ -67,7 +65,7 @@ public interface AMQSessionModel<T exten
* @param idleWarn time in milliseconds before alerting on idle transaction
* @param idleClose time in milliseconds before closing connection with idle transaction
*/
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose);
+ void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose);
void block(Queue<?> queue);
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sun Apr 17 18:56:45 2016
@@ -297,12 +297,6 @@ public class MockConsumer implements Con
}
@Override
- public String getClientID()
- {
- return null;
- }
-
- @Override
public void close()
{
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Apr 17 18:56:45 2016
@@ -808,11 +808,6 @@ public class ServerSession extends Sessi
return getConnection().getAmqpConnection();
}
- public String getClientID()
- {
- return getConnection().getClientId();
- }
-
@Override
public ServerConnection getConnection()
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Apr 17 18:56:45 2016
@@ -1387,11 +1387,6 @@ public class AMQChannel
return _connection;
}
- public String getClientID()
- {
- return _connection.getClientId();
- }
-
public LogSubject getLogSubject()
{
return _logSubject;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sun Apr 17 18:56:45 2016
@@ -120,11 +120,8 @@ public class AMQPConnection_1_0 extends
private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
- private static final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
-
-
- private static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+ private static final long CLOSE_RESPONSE_TIMEOUT = 10000L;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
@@ -267,7 +264,7 @@ public class AMQPConnection_1_0 extends
setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
- _frameWriter = new FrameWriter(getDescribedTypeRegistry());
+ _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
@@ -598,84 +595,45 @@ public class AMQPConnection_1_0 extends
short myChannelId;
if (begin.getRemoteChannel() != null)
{
- myChannelId = begin.getRemoteChannel().shortValue();
- Session_1_0 sessionEndpoint;
- try
- {
- sessionEndpoint = _sendingSessions[myChannelId];
- }
- catch (IndexOutOfBoundsException e)
- {
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
- + begin.getRemoteChannel() + " which is outside the valid range of 0 to "
- + _channelMax + ".");
- closeConnection(error);
- return;
- }
- if (sessionEndpoint != null)
- {
- if (_receivingSessions[channel] == null)
- {
- _receivingSessions[channel] = sessionEndpoint;
- sessionEndpoint.setReceivingChannel(channel);
- sessionEndpoint.setNextIncomingId(begin.getNextOutgoingId());
- sessionEndpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
-
- if (sessionEndpoint.getState() == SessionState.END_SENT)
- {
- _sendingSessions[myChannelId] = null;
- }
- }
- else
- {
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
- closeConnection(error);
- }
- }
- else
- {
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
- + begin.getRemoteChannel() + " which is not known as a begun session.");
- closeConnection(error);
- }
-
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ + begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
+ closeConnection(error);
}
else // Peer requesting session creation
{
- myChannelId = getFirstFreeChannel();
- if (myChannelId == -1)
+ if (_receivingSessions[channel] == null)
{
- // close any half open channel
myChannelId = getFirstFreeChannel();
+ if (myChannelId == -1)
+ {
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + ". There are no free channels for the broker to responsd on.");
+ closeConnection(error);
- }
-
- if (_receivingSessions[channel] == null)
- {
- Session_1_0 sessionEndpoint = new Session_1_0(this, begin);
+ }
+ Session_1_0 session = new Session_1_0(this, begin);
- _receivingSessions[channel] = sessionEndpoint;
- _sendingSessions[myChannelId] = sessionEndpoint;
+ _receivingSessions[channel] = session;
+ _sendingSessions[myChannelId] = session;
Begin beginToSend = new Begin();
- sessionEndpoint.setReceivingChannel(channel);
- sessionEndpoint.setSendingChannel(myChannelId);
+ session.setReceivingChannel(channel);
+ session.setSendingChannel(myChannelId);
beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
- beginToSend.setNextOutgoingId(sessionEndpoint.getNextOutgoingId());
- beginToSend.setOutgoingWindow(sessionEndpoint.getOutgoingWindowSize());
- beginToSend.setIncomingWindow(sessionEndpoint.getIncomingWindowSize());
+ beginToSend.setNextOutgoingId(session.getNextOutgoingId());
+ beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
+ beginToSend.setIncomingWindow(session.getIncomingWindowSize());
sendFrame(myChannelId, beginToSend);
- remoteSessionCreation(sessionEndpoint);
+ _sessions.add(session);
+ sessionAdded(session);
+
}
else
{
@@ -689,13 +647,6 @@ public class AMQPConnection_1_0 extends
}
- private void remoteSessionCreation(final Session_1_0 session)
- {
- _sessions.add(session);
- sessionAdded(session);
-
- }
-
private short getFirstFreeChannel()
{
for (int i = 0; i <= _channelMax; i++)
@@ -1219,16 +1170,6 @@ public class AMQPConnection_1_0 extends
updateLastReadTime();
try
{
- if (RAW_LOGGER.isDebugEnabled())
- {
- QpidByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- dup.dispose();
- Binary bin = new Binary(data);
- RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
- }
-
int remaining;
do
@@ -1384,45 +1325,17 @@ public class AMQPConnection_1_0 extends
public void send(final AMQFrame amqFrame, ByteBuffer buf)
{
-
updateLastWriteTime();
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
getNetwork().getRemoteAddress(),
amqFrame.getChannel(),
amqFrame.getFrameBody());
- _frameWriter.setValue(amqFrame);
-
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
-
- try
- {
- int size = _frameWriter.writeToBuffer(buffer);
- if (size > getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame, size);
- }
-
- buffer.flip();
-
- if (RAW_LOGGER.isDebugEnabled())
- {
- QpidByteBuffer dup = buffer.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- dup.dispose();
- Binary bin = new Binary(data);
- RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
- }
-
- getSender().send(buffer);
-
- }
- finally
+ int size = _frameWriter.send(amqFrame);
+ if (size > getMaxFrameSize())
{
- buffer.dispose();
+ throw new OversizeFrameException(amqFrame, size);
}
-
}
public void send(short channel, FrameBody body)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Sun Apr 17 18:56:45 2016
@@ -190,11 +190,6 @@ public abstract class LinkEndpoint<T ext
}
}
- public void receiveTransfer(final Transfer transfer, final Delivery delivery)
- {
- // TODO
- }
-
public void receiveFlow(final Flow flow)
{
}
@@ -222,20 +217,15 @@ public abstract class LinkEndpoint<T ext
public void settle(final Binary deliveryTag)
{
- Delivery delivery = _unsettledTransfers.remove(deliveryTag);
- if(delivery != null)
- {
- getSession().settle(getRole(),delivery.getDeliveryId());
- }
-
+ _unsettledTransfers.remove(deliveryTag);
}
- public void setLocalHandle(final UnsignedInteger localHandle)
+ void setLocalHandle(final UnsignedInteger localHandle)
{
_localHandle = localHandle;
}
- public void receiveAttach(final Attach attach)
+ void receiveAttach(final Attach attach)
{
switch (_state)
{
@@ -279,12 +269,12 @@ public abstract class LinkEndpoint<T ext
}
}
- public boolean isAttached()
+ boolean isAttached()
{
return _state == State.ATTACHED;
}
- public boolean isDetached()
+ boolean isDetached()
{
return _state == State.DETACHED || _session.isEnded();
}
@@ -294,7 +284,7 @@ public abstract class LinkEndpoint<T ext
return _session;
}
- public UnsignedInteger getLocalHandle()
+ UnsignedInteger getLocalHandle()
{
return _localHandle;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Sun Apr 17 18:56:45 2016
@@ -118,8 +118,7 @@ public class ReceivingLinkEndpoint exten
return Role.RECEIVER;
}
- @Override
- public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+ void receiveTransfer(final Transfer transfer, final Delivery delivery)
{
TransientState transientState;
final Binary deliveryTag = delivery.getDeliveryTag();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Apr 17 18:56:45 2016
@@ -343,7 +343,7 @@ public class Session_1_0 implements AMQS
try
{
QpidByteBuffer payload = xfr.getPayload();
- int payloadSent = _connection.sendFrame(getSendingChannel(), xfr, payload);
+ int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
{
@@ -372,7 +372,6 @@ public class Session_1_0 implements AMQS
}
catch(OversizeFrameException e)
{
- e.printStackTrace();
throw new ConnectionScopedRuntimeException(e);
}
}
@@ -392,12 +391,12 @@ public class Session_1_0 implements AMQS
case ACTIVE:
detachLinks();
remoteEnd(end);
- short sendChannel = getSendingChannel();
+ short sendChannel = _sendingChannel;
_connection.sendEnd(sendChannel, new End(), true);
_state = SessionState.ENDED;
break;
default:
- sendChannel = getSendingChannel();
+ sendChannel = _sendingChannel;
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
@@ -466,11 +465,6 @@ public class Session_1_0 implements AMQS
}
- public short getSendingChannel()
- {
- return _sendingChannel;
- }
-
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
@@ -574,7 +568,7 @@ public class Session_1_0 implements AMQS
private void send(final FrameBody frameBody)
{
- _connection.sendFrame(getSendingChannel(), frameBody);
+ _connection.sendFrame(_sendingChannel, frameBody);
}
public boolean isSyntheticError(final Error error)
@@ -587,17 +581,17 @@ public class Session_1_0 implements AMQS
switch (_state)
{
case BEGIN_SENT:
- _connection.sendEnd(getSendingChannel(), end, false);
+ _connection.sendEnd(_sendingChannel, end, false);
_state = SessionState.END_PIPE;
break;
case ACTIVE:
detachLinks();
- short sendChannel = getSendingChannel();
+ short sendChannel = _sendingChannel;
_connection.sendEnd(sendChannel, end, true);
_state = SessionState.END_SENT;
break;
default:
- sendChannel = getSendingChannel();
+ sendChannel = _sendingChannel;
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
@@ -613,127 +607,104 @@ public class Session_1_0 implements AMQS
public void receiveTransfer(final Transfer transfer)
{
_nextIncomingTransferId.incr();
- /*
- _availableIncomingCredit--;
- */
UnsignedInteger handle = transfer.getHandle();
- LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+ LinkEndpoint linkEndpoint = _remoteLinkEndpoints.get(handle);
- if (endpoint == null)
+ if (linkEndpoint == null)
{
- //TODO - error unknown link
- System.err.println("Unknown endpoint " + transfer);
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("TRANSFER called on Session for link handle " + handle + " which is not attached");
+ _connection.close(error);
}
-
- UnsignedInteger deliveryId = transfer.getDeliveryId();
- if (deliveryId == null)
+ else if(!(linkEndpoint instanceof ReceivingLinkEndpoint))
{
- deliveryId = ((ReceivingLinkEndpoint) endpoint).getLastDeliveryId();
- }
- Delivery delivery = _incomingUnsettled.get(deliveryId);
- if (delivery == null)
+ Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("TRANSFER called on Session for link handle " + handle + " which is a sending ink not a receiving link");
+ _connection.close(error);
+
+ }
+ else
{
- delivery = new Delivery(transfer, endpoint);
- _incomingUnsettled.put(deliveryId, delivery);
- if (delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+ ReceivingLinkEndpoint endpoint = ((ReceivingLinkEndpoint) linkEndpoint);
+
+ UnsignedInteger deliveryId = transfer.getDeliveryId();
+ if (deliveryId == null)
{
-/*
- _availableIncomingCredit++;
-*/
+ deliveryId = endpoint.getLastDeliveryId();
}
- if (Boolean.TRUE.equals(transfer.getMore()))
+ Delivery delivery = _incomingUnsettled.get(deliveryId);
+ if (delivery == null)
{
- ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(transfer.getDeliveryId());
+ delivery = new Delivery(transfer, endpoint);
+ _incomingUnsettled.put(deliveryId, delivery);
+
+ if (Boolean.TRUE.equals(transfer.getMore()))
+ {
+ endpoint.setLastDeliveryId(transfer.getDeliveryId());
+ }
}
- }
- else
- {
- if (delivery.getDeliveryId().equals(deliveryId))
+ else
{
- delivery.addTransfer(transfer);
- if (delivery.isSettled())
+ if (delivery.getDeliveryId().equals(deliveryId))
{
-/*
- _availableIncomingCredit++;
-*/
+ delivery.addTransfer(transfer);
+
+ if (!Boolean.TRUE.equals(transfer.getMore()))
+ {
+ endpoint.setLastDeliveryId(null);
+ }
}
- else if (Boolean.TRUE.equals(transfer.getAborted()))
+ else
{
-/*
- _availableIncomingCredit += delivery.getTransfers().size();
-*/
- }
+ End reply = new End();
+
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("TRANSFER called on Session for link handle "
+ + handle
+ + " with incorrect delivery id "
+ + transfer.getDeliveryId());
+ reply.setError(error);
+ _connection.sendEnd(_sendingChannel, reply, true);
+
+ return;
- if (!Boolean.TRUE.equals(transfer.getMore()))
- {
- ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(null);
}
}
- else
- {
- // TODO - error
- System.err.println("Incorrect transfer id " + transfer);
- }
- }
- if (endpoint != null)
- {
endpoint.receiveTransfer(transfer, delivery);
- }
- if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
- {
- _incomingUnsettled.remove(deliveryId);
+ if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+ {
+ _incomingUnsettled.remove(deliveryId);
+ }
}
-
}
- public Collection<LinkEndpoint> getLocalLinkEndpoints()
+ private Collection<LinkEndpoint> getLocalLinkEndpoints()
{
return new ArrayList<>(_localLinkEndpoints.keySet());
}
- public boolean isEnded()
+ boolean isEnded()
{
return _state == SessionState.ENDED || _connection.isClosed();
}
- public void settle(final Role role, final UnsignedInteger deliveryId)
- {
- if(role == Role.RECEIVER)
- {
- Delivery d = _incomingUnsettled.remove(deliveryId);
- if(d != null)
- {
-/*
- _availableIncomingCredit += d.getTransfers().size();
-*/
- }
- }
- else
- {
- Delivery d = _outgoingUnsettled.remove(deliveryId);
-/* if(d != null)
- {
- _availableOutgoingCredit += d.getTransfers().size();
-
- }*/
- }
-
- }
-
- public UnsignedInteger getIncomingWindowSize()
+ UnsignedInteger getIncomingWindowSize()
{
return UnsignedInteger.valueOf(_availableIncomingCredit);
}
- public AccessControlContext getAccessControllerContext()
+ AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
}
@@ -1118,9 +1089,11 @@ public class Session_1_0 implements AMQS
}
catch (AccessControlException e)
{
- //TODO
- _logger.info("Security error", e);
- throw new ConnectionScopedRuntimeException(e);
+ Error error = new Error();
+ error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+ error.setDescription(e.getMessage());
+
+ _connection.close(error);
}
catch (QueueExistsException e)
{
@@ -1131,7 +1104,7 @@ public class Session_1_0 implements AMQS
return queue;
}
- public ServerTransaction getTransaction(Binary transactionId)
+ ServerTransaction getTransaction(Binary transactionId)
{
// TODO should treat invalid id differently to null
ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
@@ -1146,8 +1119,9 @@ public class Session_1_0 implements AMQS
return transaction;
}
- public void remoteEnd(End end)
+ void remoteEnd(End end)
{
+ // TODO - if the end has a non empty error we should log it
Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
while(iter.hasNext())
@@ -1217,13 +1191,6 @@ public class Session_1_0 implements AMQS
}
@Override
- public String getClientID()
- {
- // TODO
- return "";
- }
-
- @Override
public void close()
{
performCloseTasks();
@@ -1234,7 +1201,7 @@ public class Session_1_0 implements AMQS
}
}
- protected void performCloseTasks()
+ private void performCloseTasks()
{
if(_closed.compareAndSet(false, true))
@@ -1362,7 +1329,7 @@ public class Session_1_0 implements AMQS
@Override
public int getChannelId()
{
- return getSendingChannel();
+ return _sendingChannel;
}
@Override
@@ -1385,7 +1352,7 @@ public class Session_1_0 implements AMQS
authorizedPrincipal,
remoteAddress,
getVirtualHost().getName(),
- getSendingChannel()) + "] ";
+ _sendingChannel) + "] ";
}
@Override
@@ -1419,7 +1386,7 @@ public class Session_1_0 implements AMQS
return _subject;
}
- VirtualHost<?> getVirtualHost()
+ private VirtualHost<?> getVirtualHost()
{
return _connection.getVirtualHost();
}
@@ -1687,7 +1654,7 @@ public class Session_1_0 implements AMQS
@Override
public String toString()
{
- return "Session_1_0[" + _connection + ": " + getSendingChannel() + ']';
+ return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
}
@@ -1732,7 +1699,8 @@ public class Session_1_0 implements AMQS
{
return UnsignedInteger.valueOf(i);
}
- } while(++i != 0);
+ }
+ while(++i != 0);
// TODO
throw new RuntimeException();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java Sun Apr 17 18:56:45 2016
@@ -23,261 +23,57 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
-public class FrameWriter implements ValueWriter<AMQFrame>
+public class FrameWriter
{
- private Registry _registry;
- private AMQFrame _frame;
- private State _state = State.DONE;
- private ValueWriter _typeWriter;
- private int _size = -1;
+ private final ByteBufferSender _sender;
+ private final ValueWriter.Registry _registry;
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
- private QpidByteBuffer _payload;
- enum State
- {
- SIZE_0,
- SIZE_1,
- SIZE_2,
- SIZE_3,
- DOFF,
- TYPE,
- CHANNEL_0,
- CHANNEL_1,
- DELEGATE,
- PAYLOAD,
- DONE
- }
-
- public FrameWriter(final Registry registry)
+ public FrameWriter(final ValueWriter.Registry registry, final ByteBufferSender sender)
{
_registry = registry;
+ _sender = sender;
}
- public boolean isComplete()
- {
- return _state == State.DONE;
- }
-
- public boolean isCacheable()
+ public <T> int send(AMQFrame<T> frame)
{
- return false;
- }
-
- public int writeToBuffer(QpidByteBuffer buffer)
- {
- int remaining;
-
+ final QpidByteBuffer payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
+ final int payloadLength = payload == null ? 0 : payload.remaining();
+ final T frameBody = frame.getFrameBody();
- while((remaining = buffer.remaining()) != 0 && _state != State.DONE)
+ final ValueWriter<T> typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
+ int bodySize;
+ if (typeWriter == null)
{
- switch(_state)
- {
- case SIZE_0:
-
- int payloadLength = _payload == null ? 0 : _payload.remaining();
-
- if(_typeWriter!=null)
- {
-
-
- QpidByteBuffer qpidByteBuffer = remaining > 8
- ? buffer.duplicate().position(buffer.position() + 8)
- : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
-
- _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
- qpidByteBuffer.dispose();
- }
- else
- {
- _size = 8 + payloadLength;
- }
- if(remaining >= 4)
- {
- buffer.putInt(_size);
-
- if(remaining >= 8)
- {
- buffer.put((byte)2); // DOFF
- buffer.put(_frame.getFrameType()); // AMQP Frame Type
- buffer.putShort(_frame.getChannel());
-
- if(_size - payloadLength > remaining)
- {
- buffer.position(buffer.limit());
- _state = State.DELEGATE;
- }
- else if(_size > remaining )
- {
- buffer.position(buffer.position()+_size-8-payloadLength);
- if(payloadLength > 0)
- {
-
- QpidByteBuffer dup = _payload.slice();
- int payloadUsed = buffer.remaining();
- dup.limit(payloadUsed);
- buffer.put(dup);
- dup.dispose();
- _payload.position(_payload.position()+payloadUsed);
- }
- _state = State.PAYLOAD;
- }
- else
- {
-
- buffer.position(buffer.position()+_size-8-payloadLength);
- if(payloadLength > 0)
- {
- buffer.put(_payload);
-
- }
-
- if (_payload != null)
- {
- _payload.dispose();
- _payload = null;
- }
-
- _frame = null;
- _typeWriter = null;
- _state = State.DONE;
- }
-
- }
- else
- {
- _state = State.DOFF;
- }
- break;
- }
- else
- {
- buffer.put((byte)((_size >> 24) & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.SIZE_1;
- break;
- }
- }
-
- case SIZE_1:
- buffer.put((byte)((_size >> 16) & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.SIZE_2;
- break;
- }
- case SIZE_2:
- buffer.put((byte)((_size >> 8) & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.SIZE_3;
- break;
- }
- case SIZE_3:
- buffer.put((byte)(_size & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.DOFF;
- break;
- }
- case DOFF:
- buffer.put((byte)2); // Always 2 (8 bytes)
- if(!buffer.hasRemaining())
- {
- _state = State.TYPE;
- break;
- }
- case TYPE:
- buffer.put((byte)0);
- if(!buffer.hasRemaining())
- {
- _state = State.CHANNEL_0;
- break;
- }
- case CHANNEL_0:
- buffer.put((byte)((_frame.getChannel() >> 8) & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.CHANNEL_1;
- break;
- }
- case CHANNEL_1:
- buffer.put((byte)(_frame.getChannel() & 0xFF));
- if(!buffer.hasRemaining())
- {
- _state = State.DELEGATE;
- break;
- }
- case DELEGATE:
- _typeWriter.writeToBuffer(buffer);
- if(_typeWriter.isComplete())
- {
- _state = State.PAYLOAD;
- _frame = null;
- _typeWriter = null;
- }
- else
- {
- break;
- }
- case PAYLOAD:
- if(_payload == null || _payload.remaining() == 0)
- {
- _state = State.DONE;
- _frame = null;
- _typeWriter = null;
- if (_payload != null)
- {
- _payload.dispose();
- }
- _payload = null;
-
- }
- else if(buffer.hasRemaining())
- {
- buffer.put(_payload);
- if(_payload.remaining() == 0)
- {
- _state = State.DONE;
- _frame = null;
- _typeWriter = null;
- _payload.dispose();
- _payload = null;
- }
- }
-
- }
- }
-
- return _size;
- }
-
- public void setValue(AMQFrame frame)
- {
- _frame = frame;
- _state = State.SIZE_0;
- _payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
-
- final int payloadLength = _payload == null ? 0 : _payload.remaining();
- final Object frameBody = frame.getFrameBody();
-
- _typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
- if (_typeWriter == null)
- {
- _size = 8 + payloadLength;
+ bodySize = 8;
}
else
{
- _typeWriter.setValue(_frame.getFrameBody());
+ typeWriter.setValue(frame.getFrameBody());
QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
- _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
+ bodySize = typeWriter.writeToBuffer(qpidByteBuffer) + 8;
}
- }
- public int getSize()
- {
- return _size;
+ QpidByteBuffer body = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), bodySize);
+ final int totalSize = bodySize + payloadLength;
+ body.putInt(totalSize);
+ body.put((byte)2); // DOFF
+ body.put(frame.getFrameType()); // AMQP Frame Type
+ body.putShort(frame.getChannel());
+ typeWriter.writeToBuffer(body);
+ body.flip();
+
+ _sender.send(body);
+ body.dispose();
+ if(payload != null)
+ {
+ _sender.send(payload);
+ payload.dispose();
+ }
+ return totalSize;
}
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Sun Apr 17 18:56:45 2016
@@ -153,7 +153,33 @@ public class ProtocolEngine_1_0_0Test ex
.registerTransactionLayer()
.registerSecurityLayer();
- _frameWriter = new FrameWriter(registry);
+ _frameWriter = new FrameWriter(registry, new ByteBufferSender()
+ {
+
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ _protocolEngine_1_0_0.received(msg);
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
}
public void testProtocolEngineWithNoSaslNonTLSandAnon() throws Exception
@@ -167,11 +193,7 @@ public class ProtocolEngine_1_0_0Test ex
.getHeaderIdentifier()));
Open open = new Open();
- _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
- _frameWriter.writeToBuffer(buf);
- buf.flip();
- _protocolEngine_1_0_0.received(buf);
+ _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -190,11 +212,7 @@ public class ProtocolEngine_1_0_0Test ex
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
Open open = new Open();
- _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
- _frameWriter.writeToBuffer(buf);
- buf.flip();
- _protocolEngine_1_0_0.received(buf);
+ _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class));
verify(_networkConnection).close();
@@ -220,11 +238,7 @@ public class ProtocolEngine_1_0_0Test ex
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
Open open = new Open();
- _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
- _frameWriter.writeToBuffer(buf);
- buf.flip();
- _protocolEngine_1_0_0.received(buf);
+ _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -253,22 +267,13 @@ public class ProtocolEngine_1_0_0Test ex
SaslInit init = new SaslInit();
init.setMechanism(Symbol.valueOf("ANONYMOUS"));
- _frameWriter.setValue(new SASLFrame(init));
- QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
- _frameWriter.writeToBuffer(buf);
-
- buf.flip();
- _protocolEngine_1_0_0.received(buf);
+ _frameWriter.send(new SASLFrame(init));
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance()
.getHeaderIdentifier()));
Open open = new Open();
- _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
- buf = QpidByteBuffer.allocate(64*1024);
- _frameWriter.writeToBuffer(buf);
- buf.flip();
- _protocolEngine_1_0_0.received(buf);
+ _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -288,4 +293,32 @@ public class ProtocolEngine_1_0_0Test ex
{
when(_subjectCreator.getMechanisms()).thenReturn(Arrays.asList(mechanisms));
}
+
+ private final ByteBufferSender _sender = new ByteBufferSender()
+ {
+
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ _protocolEngine_1_0_0.received(msg);
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ };
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org