You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/17 20:56:45 UTC

svn commit: r1739636 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/test/java/org/apache/qpid/server/consumer/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ bro...

Author: rgodfrey
Date: Sun Apr 17 18:56:45 2016
New Revision: 1739636

URL: http://svn.apache.org/viewvc?rev=1739636&view=rev
Log:
QPID-7202 : Tidyups and remove one copy from delivery path

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sun Apr 17 18:56:45 2016
@@ -40,17 +40,15 @@ import org.apache.qpid.transport.network
  */
 public interface AMQSessionModel<T extends AMQSessionModel<T>> extends Comparable<AMQSessionModel>, Deletable<T>
 {
-    public UUID getId();
+    UUID getId();
 
-    public AMQPConnection<?> getAMQPConnection();
+    AMQPConnection<?> getAMQPConnection();
 
-    public String getClientID();
+    void close();
 
-    public void close();
+    void close(AMQConstant cause, String message);
 
-    public void close(AMQConstant cause, String message);
-
-    public LogSubject getLogSubject();
+    LogSubject getLogSubject();
 
     /**
      * This method is called from the housekeeping thread to check the status of
@@ -67,7 +65,7 @@ public interface AMQSessionModel<T exten
      * @param idleWarn time in milliseconds before alerting on idle transaction
      * @param idleClose time in milliseconds before closing connection with idle transaction
      */
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose);
+    void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose);
 
     void block(Queue<?> queue);
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sun Apr 17 18:56:45 2016
@@ -297,12 +297,6 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public String getClientID()
-        {
-            return null;
-        }
-
-        @Override
         public void close()
         {
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Apr 17 18:56:45 2016
@@ -808,11 +808,6 @@ public class ServerSession extends Sessi
         return getConnection().getAmqpConnection();
     }
 
-    public String getClientID()
-    {
-        return getConnection().getClientId();
-    }
-
     @Override
     public ServerConnection getConnection()
     {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Apr 17 18:56:45 2016
@@ -1387,11 +1387,6 @@ public class AMQChannel
         return _connection;
     }
 
-    public String getClientID()
-    {
-        return _connection.getClientId();
-    }
-
     public LogSubject getLogSubject()
     {
         return _logSubject;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sun Apr 17 18:56:45 2016
@@ -120,11 +120,8 @@ public class AMQPConnection_1_0 extends
 
     private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
     private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
-    private static final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
 
-
-
-    private static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+    private static final long CLOSE_RESPONSE_TIMEOUT = 10000L;
 
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
@@ -267,7 +264,7 @@ public class AMQPConnection_1_0 extends
 
         setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
 
-        _frameWriter =  new FrameWriter(getDescribedTypeRegistry());
+        _frameWriter =  new FrameWriter(getDescribedTypeRegistry(), getSender());
 
 
     }
@@ -598,84 +595,45 @@ public class AMQPConnection_1_0 extends
         short myChannelId;
         if (begin.getRemoteChannel() != null)
         {
-            myChannelId = begin.getRemoteChannel().shortValue();
-            Session_1_0 sessionEndpoint;
-            try
-            {
-                sessionEndpoint = _sendingSessions[myChannelId];
-            }
-            catch (IndexOutOfBoundsException e)
-            {
-                final Error error = new Error();
-                error.setCondition(ConnectionError.FRAMING_ERROR);
-                error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
-                                     + begin.getRemoteChannel() + " which is outside the valid range of 0 to "
-                                     + _channelMax + ".");
-                closeConnection(error);
-                return;
-            }
-            if (sessionEndpoint != null)
-            {
-                if (_receivingSessions[channel] == null)
-                {
-                    _receivingSessions[channel] = sessionEndpoint;
-                    sessionEndpoint.setReceivingChannel(channel);
-                    sessionEndpoint.setNextIncomingId(begin.getNextOutgoingId());
-                    sessionEndpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
-
-                    if (sessionEndpoint.getState() == SessionState.END_SENT)
-                    {
-                        _sendingSessions[myChannelId] = null;
-                    }
-                }
-                else
-                {
-                    final Error error = new Error();
-                    error.setCondition(ConnectionError.FRAMING_ERROR);
-                    error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
-                    closeConnection(error);
-                }
-            }
-            else
-            {
-                final Error error = new Error();
-                error.setCondition(ConnectionError.FRAMING_ERROR);
-                error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
-                                     + begin.getRemoteChannel() + " which is not known as a begun session.");
-                closeConnection(error);
-            }
-
+            final Error error = new Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+                                 + begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
+            closeConnection(error);
 
         }
         else // Peer requesting session creation
         {
 
-            myChannelId = getFirstFreeChannel();
-            if (myChannelId == -1)
+            if (_receivingSessions[channel] == null)
             {
-                // close any half open channel
                 myChannelId = getFirstFreeChannel();
+                if (myChannelId == -1)
+                {
+                    final Error error = new Error();
+                    error.setCondition(ConnectionError.FRAMING_ERROR);
+                    error.setDescription("BEGIN received on channel " + channel + ". There are no free channels for the broker to responsd on.");
+                    closeConnection(error);
 
-            }
-
-            if (_receivingSessions[channel] == null)
-            {
-                Session_1_0 sessionEndpoint = new Session_1_0(this, begin);
+                }
+                Session_1_0 session = new Session_1_0(this, begin);
 
-                _receivingSessions[channel] = sessionEndpoint;
-                _sendingSessions[myChannelId] = sessionEndpoint;
+                _receivingSessions[channel] = session;
+                _sendingSessions[myChannelId] = session;
 
                 Begin beginToSend = new Begin();
 
-                sessionEndpoint.setReceivingChannel(channel);
-                sessionEndpoint.setSendingChannel(myChannelId);
+                session.setReceivingChannel(channel);
+                session.setSendingChannel(myChannelId);
                 beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
-                beginToSend.setNextOutgoingId(sessionEndpoint.getNextOutgoingId());
-                beginToSend.setOutgoingWindow(sessionEndpoint.getOutgoingWindowSize());
-                beginToSend.setIncomingWindow(sessionEndpoint.getIncomingWindowSize());
+                beginToSend.setNextOutgoingId(session.getNextOutgoingId());
+                beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
+                beginToSend.setIncomingWindow(session.getIncomingWindowSize());
                 sendFrame(myChannelId, beginToSend);
 
-                remoteSessionCreation(sessionEndpoint);
+                _sessions.add(session);
+                sessionAdded(session);
+
             }
             else
             {
@@ -689,13 +647,6 @@ public class AMQPConnection_1_0 extends
 
     }
 
-    private void remoteSessionCreation(final Session_1_0 session)
-    {
-        _sessions.add(session);
-        sessionAdded(session);
-
-    }
-
     private short getFirstFreeChannel()
     {
         for (int i = 0; i <= _channelMax; i++)
@@ -1219,16 +1170,6 @@ public class AMQPConnection_1_0 extends
                 updateLastReadTime();
                 try
                 {
-                    if (RAW_LOGGER.isDebugEnabled())
-                    {
-                        QpidByteBuffer dup = msg.duplicate();
-                        byte[] data = new byte[dup.remaining()];
-                        dup.get(data);
-                        dup.dispose();
-                        Binary bin = new Binary(data);
-                        RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
-                    }
-
                     int remaining;
 
                     do
@@ -1384,45 +1325,17 @@ public class AMQPConnection_1_0 extends
 
     public void send(final AMQFrame amqFrame, ByteBuffer buf)
     {
-
         updateLastWriteTime();
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
                            getNetwork().getRemoteAddress(),
                            amqFrame.getChannel(),
                            amqFrame.getFrameBody());
 
-        _frameWriter.setValue(amqFrame);
-
-        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
-
-        try
-        {
-            int size = _frameWriter.writeToBuffer(buffer);
-            if (size > getMaxFrameSize())
-            {
-                throw new OversizeFrameException(amqFrame, size);
-            }
-
-            buffer.flip();
-
-            if (RAW_LOGGER.isDebugEnabled())
-            {
-                QpidByteBuffer dup = buffer.duplicate();
-                byte[] data = new byte[dup.remaining()];
-                dup.get(data);
-                dup.dispose();
-                Binary bin = new Binary(data);
-                RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
-            }
-
-            getSender().send(buffer);
-
-        }
-        finally
+        int size = _frameWriter.send(amqFrame);
+        if (size > getMaxFrameSize())
         {
-            buffer.dispose();
+            throw new OversizeFrameException(amqFrame, size);
         }
-
     }
 
     public void send(short channel, FrameBody body)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Sun Apr 17 18:56:45 2016
@@ -190,11 +190,6 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    public void receiveTransfer(final Transfer transfer, final Delivery delivery)
-    {
-        // TODO
-    }
-
     public void receiveFlow(final Flow flow)
     {
     }
@@ -222,20 +217,15 @@ public abstract class LinkEndpoint<T ext
 
     public void settle(final Binary deliveryTag)
     {
-        Delivery delivery = _unsettledTransfers.remove(deliveryTag);
-        if(delivery != null)
-        {
-            getSession().settle(getRole(),delivery.getDeliveryId());
-        }
-
+        _unsettledTransfers.remove(deliveryTag);
     }
 
-    public void setLocalHandle(final UnsignedInteger localHandle)
+    void setLocalHandle(final UnsignedInteger localHandle)
     {
         _localHandle = localHandle;
     }
 
-    public void receiveAttach(final Attach attach)
+    void receiveAttach(final Attach attach)
     {
         switch (_state)
         {
@@ -279,12 +269,12 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    public boolean isAttached()
+    boolean isAttached()
     {
         return _state == State.ATTACHED;
     }
 
-    public boolean isDetached()
+    boolean isDetached()
     {
         return _state == State.DETACHED || _session.isEnded();
     }
@@ -294,7 +284,7 @@ public abstract class LinkEndpoint<T ext
         return _session;
     }
 
-    public UnsignedInteger getLocalHandle()
+    UnsignedInteger getLocalHandle()
     {
         return _localHandle;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Sun Apr 17 18:56:45 2016
@@ -118,8 +118,7 @@ public class ReceivingLinkEndpoint exten
         return Role.RECEIVER;
     }
 
-    @Override
-    public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+    void receiveTransfer(final Transfer transfer, final Delivery delivery)
     {
         TransientState transientState;
         final Binary deliveryTag = delivery.getDeliveryTag();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Apr 17 18:56:45 2016
@@ -343,7 +343,7 @@ public class Session_1_0 implements AMQS
         try
         {
             QpidByteBuffer payload = xfr.getPayload();
-            int payloadSent = _connection.sendFrame(getSendingChannel(), xfr, payload);
+            int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
 
             if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
             {
@@ -372,7 +372,6 @@ public class Session_1_0 implements AMQS
         }
         catch(OversizeFrameException e)
         {
-            e.printStackTrace();
             throw new ConnectionScopedRuntimeException(e);
         }
     }
@@ -392,12 +391,12 @@ public class Session_1_0 implements AMQS
             case ACTIVE:
                 detachLinks();
                 remoteEnd(end);
-                short sendChannel = getSendingChannel();
+                short sendChannel = _sendingChannel;
                 _connection.sendEnd(sendChannel, new End(), true);
                 _state = SessionState.ENDED;
                 break;
             default:
-                sendChannel = getSendingChannel();
+                sendChannel = _sendingChannel;
                 End reply = new End();
                 Error error = new Error();
                 error.setCondition(AmqpError.ILLEGAL_STATE);
@@ -466,11 +465,6 @@ public class Session_1_0 implements AMQS
 
     }
 
-    public short getSendingChannel()
-    {
-        return _sendingChannel;
-    }
-
     public void receiveDisposition(final Disposition disposition)
     {
         Role dispositionRole = disposition.getRole();
@@ -574,7 +568,7 @@ public class Session_1_0 implements AMQS
 
     private void send(final FrameBody frameBody)
     {
-        _connection.sendFrame(getSendingChannel(), frameBody);
+        _connection.sendFrame(_sendingChannel, frameBody);
     }
 
     public boolean isSyntheticError(final Error error)
@@ -587,17 +581,17 @@ public class Session_1_0 implements AMQS
         switch (_state)
         {
             case BEGIN_SENT:
-                _connection.sendEnd(getSendingChannel(), end, false);
+                _connection.sendEnd(_sendingChannel, end, false);
                 _state = SessionState.END_PIPE;
                 break;
             case ACTIVE:
                 detachLinks();
-                short sendChannel = getSendingChannel();
+                short sendChannel = _sendingChannel;
                 _connection.sendEnd(sendChannel, end, true);
                 _state = SessionState.END_SENT;
                 break;
             default:
-                sendChannel = getSendingChannel();
+                sendChannel = _sendingChannel;
                 End reply = new End();
                 Error error = new Error();
                 error.setCondition(AmqpError.ILLEGAL_STATE);
@@ -613,127 +607,104 @@ public class Session_1_0 implements AMQS
     public void receiveTransfer(final Transfer transfer)
     {
         _nextIncomingTransferId.incr();
-    /*
-                _availableIncomingCredit--;
-    */
 
         UnsignedInteger handle = transfer.getHandle();
 
 
-        LinkEndpoint endpoint = _remoteLinkEndpoints.get(handle);
+        LinkEndpoint linkEndpoint = _remoteLinkEndpoints.get(handle);
 
-        if (endpoint == null)
+        if (linkEndpoint == null)
         {
-            //TODO - error unknown link
-            System.err.println("Unknown endpoint " + transfer);
+            Error error = new Error();
+            error.setCondition(AmqpError.ILLEGAL_STATE);
+            error.setDescription("TRANSFER called on Session for link handle " + handle + " which is not attached");
+            _connection.close(error);
 
         }
-
-        UnsignedInteger deliveryId = transfer.getDeliveryId();
-        if (deliveryId == null)
+        else if(!(linkEndpoint instanceof ReceivingLinkEndpoint))
         {
-            deliveryId = ((ReceivingLinkEndpoint) endpoint).getLastDeliveryId();
-        }
 
-        Delivery delivery = _incomingUnsettled.get(deliveryId);
-        if (delivery == null)
+            Error error = new Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("TRANSFER called on Session for link handle " + handle + " which is a sending ink not a receiving link");
+            _connection.close(error);
+
+        }
+        else
         {
-            delivery = new Delivery(transfer, endpoint);
-            _incomingUnsettled.put(deliveryId, delivery);
-            if (delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted()))
+            ReceivingLinkEndpoint endpoint = ((ReceivingLinkEndpoint) linkEndpoint);
+
+            UnsignedInteger deliveryId = transfer.getDeliveryId();
+            if (deliveryId == null)
             {
-/*
-                    _availableIncomingCredit++;
-*/
+                deliveryId = endpoint.getLastDeliveryId();
             }
 
-            if (Boolean.TRUE.equals(transfer.getMore()))
+            Delivery delivery = _incomingUnsettled.get(deliveryId);
+            if (delivery == null)
             {
-                ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(transfer.getDeliveryId());
+                delivery = new Delivery(transfer, endpoint);
+                _incomingUnsettled.put(deliveryId, delivery);
+
+                if (Boolean.TRUE.equals(transfer.getMore()))
+                {
+                    endpoint.setLastDeliveryId(transfer.getDeliveryId());
+                }
             }
-        }
-        else
-        {
-            if (delivery.getDeliveryId().equals(deliveryId))
+            else
             {
-                delivery.addTransfer(transfer);
-                if (delivery.isSettled())
+                if (delivery.getDeliveryId().equals(deliveryId))
                 {
-/*
-                        _availableIncomingCredit++;
-*/
+                    delivery.addTransfer(transfer);
+
+                    if (!Boolean.TRUE.equals(transfer.getMore()))
+                    {
+                        endpoint.setLastDeliveryId(null);
+                    }
                 }
-                else if (Boolean.TRUE.equals(transfer.getAborted()))
+                else
                 {
-/*
-                        _availableIncomingCredit += delivery.getTransfers().size();
-*/
-                }
+                    End reply = new End();
+
+                    Error error = new Error();
+                    error.setCondition(AmqpError.ILLEGAL_STATE);
+                    error.setDescription("TRANSFER called on Session for link handle "
+                                         + handle
+                                         + " with incorrect delivery id "
+                                         + transfer.getDeliveryId());
+                    reply.setError(error);
+                    _connection.sendEnd(_sendingChannel, reply, true);
+
+                    return;
 
-                if (!Boolean.TRUE.equals(transfer.getMore()))
-                {
-                    ((ReceivingLinkEndpoint) endpoint).setLastDeliveryId(null);
                 }
             }
-            else
-            {
-                // TODO - error
-                System.err.println("Incorrect transfer id " + transfer);
-            }
-        }
 
-        if (endpoint != null)
-        {
             endpoint.receiveTransfer(transfer, delivery);
-        }
 
-        if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
-        {
-            _incomingUnsettled.remove(deliveryId);
+            if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
+            {
+                _incomingUnsettled.remove(deliveryId);
+            }
         }
-
     }
 
-    public Collection<LinkEndpoint> getLocalLinkEndpoints()
+    private Collection<LinkEndpoint> getLocalLinkEndpoints()
     {
         return new ArrayList<>(_localLinkEndpoints.keySet());
     }
 
-    public boolean isEnded()
+    boolean isEnded()
     {
         return _state == SessionState.ENDED || _connection.isClosed();
     }
 
-    public void settle(final Role role, final UnsignedInteger deliveryId)
-    {
-        if(role == Role.RECEIVER)
-        {
-            Delivery d = _incomingUnsettled.remove(deliveryId);
-            if(d != null)
-            {
-/*
-                _availableIncomingCredit += d.getTransfers().size();
-*/
-            }
-        }
-        else
-        {
-            Delivery d = _outgoingUnsettled.remove(deliveryId);
-/*            if(d != null)
-            {
-                _availableOutgoingCredit += d.getTransfers().size();
-
-            }*/
-        }
-
-    }
-
-    public UnsignedInteger getIncomingWindowSize()
+    UnsignedInteger getIncomingWindowSize()
     {
         return UnsignedInteger.valueOf(_availableIncomingCredit);
     }
 
-    public AccessControlContext getAccessControllerContext()
+    AccessControlContext getAccessControllerContext()
     {
         return _accessControllerContext;
     }
@@ -1118,9 +1089,11 @@ public class Session_1_0 implements AMQS
         }
         catch (AccessControlException e)
         {
-            //TODO
-            _logger.info("Security error", e);
-            throw new ConnectionScopedRuntimeException(e);
+            Error error = new Error();
+            error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+            error.setDescription(e.getMessage());
+
+            _connection.close(error);
         }
         catch (QueueExistsException e)
         {
@@ -1131,7 +1104,7 @@ public class Session_1_0 implements AMQS
         return queue;
     }
 
-    public ServerTransaction getTransaction(Binary transactionId)
+    ServerTransaction getTransaction(Binary transactionId)
     {
         // TODO should treat invalid id differently to null
         ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
@@ -1146,8 +1119,9 @@ public class Session_1_0 implements AMQS
         return transaction;
     }
 
-    public void remoteEnd(End end)
+    void remoteEnd(End end)
     {
+        // TODO - if the end has a non empty error we should log it
         Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
 
         while(iter.hasNext())
@@ -1217,13 +1191,6 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public String getClientID()
-    {
-        // TODO
-        return "";
-    }
-
-    @Override
     public void close()
     {
         performCloseTasks();
@@ -1234,7 +1201,7 @@ public class Session_1_0 implements AMQS
         }
     }
 
-    protected void performCloseTasks()
+    private void performCloseTasks()
     {
 
         if(_closed.compareAndSet(false, true))
@@ -1362,7 +1329,7 @@ public class Session_1_0 implements AMQS
     @Override
     public int getChannelId()
     {
-        return getSendingChannel();
+        return _sendingChannel;
     }
 
     @Override
@@ -1385,7 +1352,7 @@ public class Session_1_0 implements AMQS
                                     authorizedPrincipal,
                                     remoteAddress,
                                     getVirtualHost().getName(),
-                                    getSendingChannel())  + "] ";
+                                    _sendingChannel) + "] ";
     }
 
     @Override
@@ -1419,7 +1386,7 @@ public class Session_1_0 implements AMQS
         return _subject;
     }
 
-    VirtualHost<?> getVirtualHost()
+    private VirtualHost<?> getVirtualHost()
     {
         return _connection.getVirtualHost();
     }
@@ -1687,7 +1654,7 @@ public class Session_1_0 implements AMQS
     @Override
     public String toString()
     {
-        return "Session_1_0[" + _connection + ": " + getSendingChannel() + ']';
+        return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
     }
 
 
@@ -1732,7 +1699,8 @@ public class Session_1_0 implements AMQS
             {
                 return UnsignedInteger.valueOf(i);
             }
-        } while(++i != 0);
+        }
+        while(++i != 0);
 
         // TODO
         throw new RuntimeException();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java Sun Apr 17 18:56:45 2016
@@ -23,261 +23,57 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
 
-public class FrameWriter implements ValueWriter<AMQFrame>
+public class FrameWriter
 {
-    private Registry _registry;
-    private AMQFrame _frame;
-    private State _state = State.DONE;
-    private ValueWriter _typeWriter;
-    private int _size = -1;
+    private final ByteBufferSender _sender;
+    private final ValueWriter.Registry _registry;
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
-    private QpidByteBuffer _payload;
 
-    enum State
-    {
-        SIZE_0,
-        SIZE_1,
-        SIZE_2,
-        SIZE_3,
-        DOFF,
-        TYPE,
-        CHANNEL_0,
-        CHANNEL_1,
-        DELEGATE,
-        PAYLOAD,
-        DONE
-    }
-
-    public FrameWriter(final Registry registry)
+    public FrameWriter(final ValueWriter.Registry registry, final ByteBufferSender sender)
     {
         _registry = registry;
+        _sender = sender;
     }
 
-    public boolean isComplete()
-    {
-        return _state == State.DONE;
-    }
-
-    public boolean isCacheable()
+    public <T> int send(AMQFrame<T> frame)
     {
-        return false;
-    }
-
-    public int writeToBuffer(QpidByteBuffer buffer)
-    {
-        int remaining;
-
+        final QpidByteBuffer payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
 
+        final int payloadLength = payload == null ? 0 : payload.remaining();
+        final T frameBody = frame.getFrameBody();
 
-        while((remaining = buffer.remaining()) != 0 && _state != State.DONE)
+        final ValueWriter<T> typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
+        int bodySize;
+        if (typeWriter == null)
         {
-            switch(_state)
-            {
-                case SIZE_0:
-
-                    int payloadLength = _payload == null ? 0 : _payload.remaining();
-
-                    if(_typeWriter!=null)
-                    {
-
-
-                        QpidByteBuffer qpidByteBuffer = remaining > 8
-                                ? buffer.duplicate().position(buffer.position() + 8)
-                                : QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
-
-                        _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
-                        qpidByteBuffer.dispose();
-                    }
-                    else
-                    {
-                        _size = 8 + payloadLength;
-                    }
-                    if(remaining >= 4)
-                    {
-                        buffer.putInt(_size);
-
-                        if(remaining >= 8)
-                        {
-                            buffer.put((byte)2); // DOFF
-                            buffer.put(_frame.getFrameType()); // AMQP Frame Type
-                            buffer.putShort(_frame.getChannel());
-
-                            if(_size - payloadLength > remaining)
-                            {
-                                buffer.position(buffer.limit());
-                                _state = State.DELEGATE;
-                            }
-                            else if(_size > remaining )
-                            {
-                                buffer.position(buffer.position()+_size-8-payloadLength);
-                                if(payloadLength > 0)
-                                {
-
-                                    QpidByteBuffer dup = _payload.slice();
-                                    int payloadUsed = buffer.remaining();
-                                    dup.limit(payloadUsed);
-                                    buffer.put(dup);
-                                    dup.dispose();
-                                    _payload.position(_payload.position()+payloadUsed);
-                                }
-                                _state = State.PAYLOAD;
-                            }
-                            else
-                            {
-
-                                buffer.position(buffer.position()+_size-8-payloadLength);
-                                if(payloadLength > 0)
-                                {
-                                    buffer.put(_payload);
-
-                                }
-
-                                if (_payload != null)
-                                {
-                                    _payload.dispose();
-                                    _payload = null;
-                                }
-
-                                _frame = null;
-                                _typeWriter = null;
-                                _state = State.DONE;
-                            }
-
-                        }
-                        else
-                        {
-                            _state = State.DOFF;
-                        }
-                        break;
-                    }
-                    else
-                    {
-                        buffer.put((byte)((_size >> 24) & 0xFF));
-                        if(!buffer.hasRemaining())
-                        {
-                            _state = State.SIZE_1;
-                            break;
-                        }
-                    }
-
-                case SIZE_1:
-                    buffer.put((byte)((_size >> 16) & 0xFF));
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.SIZE_2;
-                        break;
-                    }
-                case SIZE_2:
-                    buffer.put((byte)((_size >> 8) & 0xFF));
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.SIZE_3;
-                        break;
-                    }
-                case SIZE_3:
-                    buffer.put((byte)(_size & 0xFF));
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.DOFF;
-                        break;
-                    }
-                case DOFF:
-                    buffer.put((byte)2); // Always 2 (8 bytes)
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.TYPE;
-                        break;
-                    }
-                case TYPE:
-                    buffer.put((byte)0);
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.CHANNEL_0;
-                        break;
-                    }
-                case CHANNEL_0:
-                    buffer.put((byte)((_frame.getChannel() >> 8) & 0xFF));
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.CHANNEL_1;
-                        break;
-                    }
-                case CHANNEL_1:
-                    buffer.put((byte)(_frame.getChannel() & 0xFF));
-                    if(!buffer.hasRemaining())
-                    {
-                        _state = State.DELEGATE;
-                        break;
-                    }
-                case DELEGATE:
-                    _typeWriter.writeToBuffer(buffer);
-                    if(_typeWriter.isComplete())
-                    {
-                        _state = State.PAYLOAD;
-                        _frame = null;
-                        _typeWriter = null;
-                    }
-                    else
-                    {
-                        break;
-                    }
-                case PAYLOAD:
-                    if(_payload == null || _payload.remaining() == 0)
-                    {
-                        _state = State.DONE;
-                        _frame = null;
-                        _typeWriter = null;
-                        if (_payload != null)
-                        {
-                            _payload.dispose();
-                        }
-                        _payload = null;
-
-                    }
-                    else if(buffer.hasRemaining())
-                    {
-                        buffer.put(_payload);
-                        if(_payload.remaining() == 0)
-                        {
-                            _state = State.DONE;
-                            _frame = null;
-                            _typeWriter = null;
-                            _payload.dispose();
-                            _payload = null;
-                        }
-                    }
-
-            }
-        }
-
-        return _size;
-    }
-
-    public void setValue(AMQFrame frame)
-    {
-        _frame = frame;
-        _state = State.SIZE_0;
-        _payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
-
-        final int payloadLength = _payload == null ? 0 : _payload.remaining();
-        final Object frameBody = frame.getFrameBody();
-
-        _typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
-        if (_typeWriter == null)
-        {
-            _size = 8 + payloadLength;
+            bodySize = 8;
         }
         else
         {
-            _typeWriter.setValue(_frame.getFrameBody());
+            typeWriter.setValue(frame.getFrameBody());
             QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(EMPTY_BYTE_ARRAY);
-            _size = _typeWriter.writeToBuffer(qpidByteBuffer) + 8 + payloadLength;
+            bodySize = typeWriter.writeToBuffer(qpidByteBuffer) + 8;
         }
-    }
 
-    public int getSize()
-    {
-        return _size;
+        QpidByteBuffer body = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), bodySize);
+        final int totalSize = bodySize + payloadLength;
+        body.putInt(totalSize);
+        body.put((byte)2); // DOFF
+        body.put(frame.getFrameType()); // AMQP Frame Type
+        body.putShort(frame.getChannel());
+        typeWriter.writeToBuffer(body);
+        body.flip();
+
+        _sender.send(body);
+        body.dispose();
+        if(payload != null)
+        {
+            _sender.send(payload);
+            payload.dispose();
+        }
+        return totalSize;
     }
+
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1739636&r1=1739635&r2=1739636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Sun Apr 17 18:56:45 2016
@@ -153,7 +153,33 @@ public class ProtocolEngine_1_0_0Test ex
                 .registerTransactionLayer()
                 .registerSecurityLayer();
 
-        _frameWriter = new FrameWriter(registry);
+        _frameWriter = new FrameWriter(registry, new ByteBufferSender()
+                                                {
+
+                                                    @Override
+                                                    public boolean isDirectBufferPreferred()
+                                                    {
+                                                        return false;
+                                                    }
+
+                                                    @Override
+                                                    public void send(final QpidByteBuffer msg)
+                                                    {
+                                                        _protocolEngine_1_0_0.received(msg);
+                                                    }
+
+                                                    @Override
+                                                    public void flush()
+                                                    {
+
+                                                    }
+
+                                                    @Override
+                                                    public void close()
+                                                    {
+
+                                                    }
+                                                });
     }
 
     public void testProtocolEngineWithNoSaslNonTLSandAnon() throws Exception
@@ -167,11 +193,7 @@ public class ProtocolEngine_1_0_0Test ex
                                                                    .getHeaderIdentifier()));
 
         Open open = new Open();
-        _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
-        _frameWriter.writeToBuffer(buf);
-        buf.flip();
-        _protocolEngine_1_0_0.received(buf);
+        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -190,11 +212,7 @@ public class ProtocolEngine_1_0_0Test ex
         _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
 
         Open open = new Open();
-        _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
-        _frameWriter.writeToBuffer(buf);
-        buf.flip();
-        _protocolEngine_1_0_0.received(buf);
+        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
 
         verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class));
         verify(_networkConnection).close();
@@ -220,11 +238,7 @@ public class ProtocolEngine_1_0_0Test ex
         _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
 
         Open open = new Open();
-        _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
-        _frameWriter.writeToBuffer(buf);
-        buf.flip();
-        _protocolEngine_1_0_0.received(buf);
+        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -253,22 +267,13 @@ public class ProtocolEngine_1_0_0Test ex
 
         SaslInit init = new SaslInit();
         init.setMechanism(Symbol.valueOf("ANONYMOUS"));
-        _frameWriter.setValue(new SASLFrame(init));
-        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
-        _frameWriter.writeToBuffer(buf);
-
-        buf.flip();
-        _protocolEngine_1_0_0.received(buf);
+        _frameWriter.send(new SASLFrame(init));
 
         _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance()
                                                                    .getHeaderIdentifier()));
 
         Open open = new Open();
-        _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        buf = QpidByteBuffer.allocate(64*1024);
-        _frameWriter.writeToBuffer(buf);
-        buf.flip();
-        _protocolEngine_1_0_0.received(buf);
+        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -288,4 +293,32 @@ public class ProtocolEngine_1_0_0Test ex
     {
         when(_subjectCreator.getMechanisms()).thenReturn(Arrays.asList(mechanisms));
     }
+
+    private final ByteBufferSender _sender = new ByteBufferSender()
+    {
+
+        @Override
+        public boolean isDirectBufferPreferred()
+        {
+            return false;
+        }
+
+        @Override
+        public void send(final QpidByteBuffer msg)
+        {
+            _protocolEngine_1_0_0.received(msg);
+        }
+
+        @Override
+        public void flush()
+        {
+
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+    };
 }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org