You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/02/16 16:11:27 UTC

svn commit: r1783244 [2/2] - /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java&r1=1783242&r2=1783244&rev=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,8 +15,8 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
+
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.security.AccessControlException;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
@@ -59,6 +57,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
@@ -70,40 +69,37 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
+public class StandardReceivingLinkEndpoint extends ReceivingLinkEndpoint
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLink_1_0.class);
-    private NamedAddressSpace _addressSpace;
-
-    private ReceivingDestination _destination;
-    private SectionDecoderImpl _sectionDecoder;
-    private volatile ReceivingLinkAttachment _attachment;
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
 
+    private final SectionDecoderImpl _sectionDecoder;
     private ArrayList<Transfer> _incompleteMessage;
-    private TerminusDurability _durability;
-
-    private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
     private boolean _resumedMessage;
     private Binary _messageDeliveryTag;
-    private ReceiverSettleMode _receivingSettlementMode;
-
+    private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
 
-    public StandardReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, NamedAddressSpace addressSpace,
-                                     ReceivingDestination destination)
+    public StandardReceivingLinkEndpoint(final Session_1_0 session,
+                                         final Attach attach)
     {
-        _addressSpace = addressSpace;
-        _destination = destination;
-        _attachment = receivingLinkAttachment;
-        _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
-        _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
-
-        _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
+        super(session, attach);
+        _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getSectionDecoderRegistry());
+    }
 
+    @Override
+    public void start()
+    {
+        setLinkCredit(UnsignedInteger.valueOf(getReceivingDestination().getCredit()));
+        setCreditWindow();
+    }
 
+    private TerminusDurability getDurability()
+    {
+        return ((Target) getTarget()).getDurable();
     }
 
-    public Error messageTransfer(Transfer xfr)
+    @Override
+    protected Error messageTransfer(Transfer xfr)
     {
         List<QpidByteBuffer> fragments = null;
 
@@ -121,7 +117,6 @@ public class StandardReceivingLink_1_0 i
         else if(_incompleteMessage != null)
         {
             _incompleteMessage.add(xfr);
-
             if(Boolean.TRUE.equals(xfr.getMore()))
             {
                 return null;
@@ -157,7 +152,7 @@ public class StandardReceivingLink_1_0 i
             {
                 Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
                 boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
-                getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
+                updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
                 if(settled)
                 {
                     _unsettledMap.remove(_messageDeliveryTag);
@@ -178,7 +173,7 @@ public class StandardReceivingLink_1_0 i
                 List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
 
                 MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
-                MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+                MessageHandle<MessageMetaData_1_0> handle = getAddressSpace().getMessageStore().addMessage(mmd);
 
                 for (EncodingRetainingSection<?> dataSection : dataSections)
                 {
@@ -190,7 +185,8 @@ public class StandardReceivingLink_1_0 i
                 }
                 final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
                 Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-                routingAddress = _destination.getRoutingAddress(message);
+
+                routingAddress = getReceivingDestination().getRoutingAddress(message);
                 serverMessage = message;
             }
             else
@@ -198,8 +194,8 @@ public class StandardReceivingLink_1_0 i
                 MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
                 if(format != null)
                 {
-                    serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
-                    routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
+                    serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
+                    routingAddress = format.getRoutingAddress(serverMessage, getReceivingDestination().getAddress());
                 }
                 else
                 {
@@ -235,7 +231,7 @@ public class StandardReceivingLink_1_0 i
                 }
                 else
                 {
-                    transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
+                    transaction = new AutoCommitTransaction(getAddressSpace().getMessageStore());
                 }
 
                 try
@@ -243,11 +239,11 @@ public class StandardReceivingLink_1_0 i
                     Session_1_0 session = getSession();
 
                     session.getAMQPConnection()
-                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
-                    _destination.authorizePublish(session.getSecurityToken(), routingAddress);
+                           .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
+                    getReceivingDestination().authorizePublish(session.getSecurityToken(), routingAddress);
 
-                    Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
-                    Source source = (Source) getEndpoint().getSource();
+                    Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
+                    Source source = (Source) getSource();
 
                     DeliveryState resultantState;
 
@@ -288,10 +284,10 @@ public class StandardReceivingLink_1_0 i
                         _unsettledMap.put(deliveryTag, outcome);
                     }
 
-                    getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+                    updateDisposition(deliveryTag, resultantState, settled);
 
                     getSession().getAMQPConnection()
-                            .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
+                                .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
 
                     if (!(transaction instanceof AutoCommitTransaction))
                     {
@@ -300,12 +296,12 @@ public class StandardReceivingLink_1_0 i
                         {
                             public void postCommit()
                             {
-                                getEndpoint().updateDisposition(deliveryTag, null, true);
+                                updateDisposition(deliveryTag, null, true);
                             }
 
                             public void onRollback()
                             {
-                                getEndpoint().updateDisposition(deliveryTag, null, true);
+                                updateDisposition(deliveryTag, null, true);
                             }
                         });
                     }
@@ -315,7 +311,7 @@ public class StandardReceivingLink_1_0 i
                     final Error err = new Error();
                     err.setCondition(AmqpError.NOT_ALLOWED);
                     err.setDescription(e.getMessage());
-                    _attachment.getEndpoint().close(err);
+                    close(err);
 
                 }
             }
@@ -327,6 +323,36 @@ public class StandardReceivingLink_1_0 i
         return null;
     }
 
+    @Override
+    protected void remoteDetachedPerformDetach(Detach detach)
+    {
+        if(!TerminusDurability.UNSETTLED_STATE.equals(getDurability()) ||
+           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        {
+            close();
+        }
+        else if(detach == null || detach.getError() != null)
+        {
+            getLink().setLinkAttachmentToNull();
+            // TODO do we have to set an error?
+            detach();
+        }
+        else
+        {
+            detach();
+        }
+    }
+
+    @Override
+    protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    {
+        if(Boolean.TRUE.equals(settled))
+        {
+            _unsettledMap.remove(deliveryTag);
+        }
+    }
+
+
     private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
                                                       final List<EncodingRetainingSection<?>> dataSections)
     {
@@ -334,7 +360,7 @@ public class StandardReceivingLink_1_0 i
         List<EncodingRetainingSection<?>> sections;
         try
         {
-             sections = _sectionDecoder.parseAll(fragments);
+            sections = _sectionDecoder.parseAll(fragments);
         }
         catch (AmqpErrorException e)
         {
@@ -396,7 +422,8 @@ public class StandardReceivingLink_1_0 i
                 contentSize += s.getEncodedSize();
                 dataSections.add(s);
                 s = iter.hasNext() ? iter.next() : null;
-            } while (s instanceof DataSection);
+            }
+            while (s instanceof DataSection);
         }
         else if (s instanceof AmqpSequenceSection)
         {
@@ -416,7 +443,7 @@ public class StandardReceivingLink_1_0 i
         }
         if (s != null)
         {
-            // TODO error
+            throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
         }
         return new MessageMetaData_1_0(headerSection,
                                        deliveryAnnotationsSection,
@@ -428,58 +455,9 @@ public class StandardReceivingLink_1_0 i
                                        contentSize);
     }
 
-    private ReceiverSettleMode getReceivingSettlementMode()
-    {
-        return _receivingSettlementMode;
-    }
-
-    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
-    {
-        //TODO
-        // if not durable or close
-        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
-           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
-        {
-            endpoint.close();
-        }
-        else if(detach == null || detach.getError() != null)
-        {
-            _attachment = null;
-        }
-    }
-
-    public void start()
-    {
-        getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
-        getEndpoint().setCreditWindow();
-    }
-
-    public ReceivingLinkEndpoint getEndpoint()
+    public void doLinkAttachment()
     {
-        return _attachment.getEndpoint();
-    }
-
-
-    public Session_1_0 getSession()
-    {
-        ReceivingLinkAttachment attachment = _attachment;
-        return attachment == null ? null : attachment.getSession();
-    }
-
-    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
-    {
-        if(Boolean.TRUE.equals(settled))
-        {
-            _unsettledMap.remove(deliveryTag);
-        }
-    }
-
-    public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
-    {
-        _attachment = linkAttachment;
-        _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
-        ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
-        Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+        Map initialUnsettledMap = getInitialUnsettledMap();
 
         Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
         for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
@@ -490,16 +468,12 @@ public class StandardReceivingLink_1_0 i
                 _unsettledMap.remove(deliveryTag);
             }
         }
-
     }
 
-    public Map getUnsettledOutcomeMap()
+    public Map<Binary, Outcome> getUnsettledOutcomeMap()
     {
         return _unsettledMap;
     }
 
-    public ReceivingDestination getDestination()
-    {
-        return _destination;
-    }
+
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,486 +20,30 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.plugin.MessageFormat;
-import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-
 public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLink_1_0.class);
-    private NamedAddressSpace _addressSpace;
-
-    private ReceivingDestination _destination;
-    private SectionDecoderImpl _sectionDecoder;
-    private volatile ReceivingLinkAttachment _attachment;
-
-
-    private ArrayList<Transfer> _incompleteMessage;
-    private TerminusDurability _durability;
-
-    private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
-    private boolean _resumedMessage;
-    private Binary _messageDeliveryTag;
-    private ReceiverSettleMode _receivingSettlementMode;
-
-
-    public StandardReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, NamedAddressSpace addressSpace,
-                                     ReceivingDestination destination)
-    {
-        _addressSpace = addressSpace;
-        _destination = destination;
-        _attachment = receivingLinkAttachment;
-        _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
-        _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
-
-        _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
-
-
-    }
-
-    public Error messageTransfer(Transfer xfr)
-    {
-        List<QpidByteBuffer> fragments = null;
-
-        org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
-        final Binary deliveryTag = xfr.getDeliveryTag();
-        UnsignedInteger messageFormat = null;
-        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
-        {
-            _incompleteMessage = new ArrayList<>();
-            _incompleteMessage.add(xfr);
-            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = deliveryTag;
-            return null;
-        }
-        else if(_incompleteMessage != null)
-        {
-            _incompleteMessage.add(xfr);
-
-            if(Boolean.TRUE.equals(xfr.getMore()))
-            {
-                return null;
-            }
-
-            fragments = new ArrayList<>(_incompleteMessage.size());
-
-            for(Transfer t : _incompleteMessage)
-            {
-                if(t.getMessageFormat() != null && messageFormat == null)
-                {
-                    messageFormat = t.getMessageFormat();
-                }
-                fragments.addAll(t.getPayload());
-                t.dispose();
-            }
-            _incompleteMessage=null;
-
-        }
-        else
-        {
-            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = deliveryTag;
-            fragments = xfr.getPayload();
-            messageFormat = xfr.getMessageFormat();
-
-            xfr.dispose();
-        }
-
-        if(_resumedMessage)
-        {
-            if(_unsettledMap.containsKey(_messageDeliveryTag))
-            {
-                Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
-                boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
-                getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
-                if(settled)
-                {
-                    _unsettledMap.remove(_messageDeliveryTag);
-                }
-            }
-            else
-            {
-                throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
-            }
-        }
-        else
-        {
-            ServerMessage<?> serverMessage;
-            String routingAddress;
-
-            if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
-            {
-                List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
-
-                MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
-                MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
-
-                for (EncodingRetainingSection<?> dataSection : dataSections)
-                {
-                    for (QpidByteBuffer buf : dataSection.getEncodedForm())
-                    {
-                        handle.addContent(buf);
-                        buf.dispose();
-                    }
-                }
-                final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
-                Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
-                routingAddress = _destination.getRoutingAddress(message);
-                serverMessage = message;
-            }
-            else
-            {
-                MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
-                if(format != null)
-                {
-                    serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
-                    routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
-                }
-                else
-                {
-                    final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_IMPLEMENTED);
-                    err.setDescription("Unknown message format: " + messageFormat);
-                    return err;
-                }
-            }
-
-            for(QpidByteBuffer fragment: fragments)
-            {
-                fragment.dispose();
-            }
-            fragments = null;
-
-            MessageReference<?> reference = serverMessage.newReference();
-            try
-            {
-                Binary transactionId = null;
-                if (xfrState != null)
-                {
-                    if (xfrState instanceof TransactionalState)
-                    {
-                        transactionId = ((TransactionalState) xfrState).getTxnId();
-                    }
-                }
-
-                ServerTransaction transaction = null;
-                if (transactionId != null)
-                {
-                    transaction = getSession().getTransaction(transactionId);
-                }
-                else
-                {
-                    transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
-                }
-
-                try
-                {
-                    Session_1_0 session = getSession();
-
-                    session.getAMQPConnection()
-                            .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
-                    _destination.authorizePublish(session.getSecurityToken(), routingAddress);
-
-                    Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
-                    Source source = (Source) getEndpoint().getSource();
-
-                    DeliveryState resultantState;
-
-                    if(source.getOutcomes() == null || Arrays.asList(source.getOutcomes()).contains(outcome.getSymbol()))
-                    {
-                        if (transactionId == null)
-                        {
-                            resultantState = (DeliveryState) outcome;
-                        }
-                        else
-                        {
-                            TransactionalState transactionalState = new TransactionalState();
-                            transactionalState.setOutcome(outcome);
-                            transactionalState.setTxnId(transactionId);
-                            resultantState = transactionalState;
-                        }
-                    }
-                    else if(transactionId != null)
-                    {
-                        // cause the txn to fail
-                        if(transaction instanceof LocalTransaction)
-                        {
-                            ((LocalTransaction) transaction).setRollbackOnly();
-                        }
-                        resultantState = null;
-                    }
-                    else
-                    {
-                        // we should just use the default outcome
-                        resultantState = null;
-                    }
-
-
-                    boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()                                                             );
+    private volatile ReceivingLinkEndpoint _linkEndpoint;
 
-                    if (!settled)
-                    {
-                        _unsettledMap.put(deliveryTag, outcome);
-                    }
-
-                    getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
-
-                    getSession().getAMQPConnection()
-                            .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
-
-                    if (!(transaction instanceof AutoCommitTransaction))
-                    {
-                        ServerTransaction.Action a;
-                        transaction.addPostTransactionAction(new ServerTransaction.Action()
-                        {
-                            public void postCommit()
-                            {
-                                getEndpoint().updateDisposition(deliveryTag, null, true);
-                            }
-
-                            public void onRollback()
-                            {
-                                getEndpoint().updateDisposition(deliveryTag, null, true);
-                            }
-                        });
-                    }
-                }
-                catch (AccessControlException e)
-                {
-                    final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_ALLOWED);
-                    err.setDescription(e.getMessage());
-                    _attachment.getEndpoint().close(err);
-
-                }
-            }
-            finally
-            {
-                reference.release();
-            }
-        }
-        return null;
-    }
-
-    private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
-                                                      final List<EncodingRetainingSection<?>> dataSections)
-    {
-
-        List<EncodingRetainingSection<?>> sections;
-        try
-        {
-             sections = _sectionDecoder.parseAll(fragments);
-        }
-        catch (AmqpErrorException e)
-        {
-            LOGGER.error("Decoding read section error", e);
-            // TODO - fix error handling
-            throw new IllegalArgumentException(e);
-        }
-
-        long contentSize = 0L;
-
-        HeaderSection headerSection = null;
-        PropertiesSection propertiesSection = null;
-        DeliveryAnnotationsSection deliveryAnnotationsSection = null;
-        MessageAnnotationsSection messageAnnotationsSection = null;
-        ApplicationPropertiesSection applicationPropertiesSection = null;
-        FooterSection footerSection = null;
-        Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
-        EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
-        if (s instanceof HeaderSection)
-        {
-            headerSection = (HeaderSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof DeliveryAnnotationsSection)
-        {
-            deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof MessageAnnotationsSection)
-        {
-            messageAnnotationsSection = (MessageAnnotationsSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof PropertiesSection)
-        {
-            propertiesSection = (PropertiesSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof ApplicationPropertiesSection)
-        {
-            applicationPropertiesSection = (ApplicationPropertiesSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-
-        if (s instanceof AmqpValueSection)
-        {
-            contentSize = s.getEncodedSize();
-            dataSections.add(s);
-            s = iter.hasNext() ? iter.next() : null;
-        }
-        else if (s instanceof DataSection)
-        {
-            do
-            {
-                contentSize += s.getEncodedSize();
-                dataSections.add(s);
-                s = iter.hasNext() ? iter.next() : null;
-            } while (s instanceof DataSection);
-        }
-        else if (s instanceof AmqpSequenceSection)
-        {
-            do
-            {
-                contentSize += s.getEncodedSize();
-                dataSections.add(s);
-                s = iter.hasNext() ? iter.next() : null;
-            }
-            while (s instanceof AmqpSequenceSection);
-        }
-
-        if (s instanceof FooterSection)
-        {
-            footerSection = (FooterSection) s;
-            s = iter.hasNext() ? iter.next() : null;
-        }
-        if (s != null)
-        {
-            // TODO error
-        }
-        return new MessageMetaData_1_0(headerSection,
-                                       deliveryAnnotationsSection,
-                                       messageAnnotationsSection,
-                                       propertiesSection,
-                                       applicationPropertiesSection,
-                                       footerSection,
-                                       System.currentTimeMillis(),
-                                       contentSize);
-    }
-
-    private ReceiverSettleMode getReceivingSettlementMode()
+    public StandardReceivingLink_1_0(final ReceivingLinkEndpoint endpoint)
     {
-        return _receivingSettlementMode;
-    }
-
-    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
-    {
-        //TODO
-        // if not durable or close
-        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
-           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
-        {
-            endpoint.close();
-        }
-        else if(detach == null || detach.getError() != null)
-        {
-            _attachment = null;
-        }
-    }
-
-    public void start()
-    {
-        getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
-        getEndpoint().setCreditWindow();
+        _linkEndpoint = endpoint;
     }
 
     public ReceivingLinkEndpoint getEndpoint()
     {
-        return _attachment.getEndpoint();
-    }
-
-
-    public Session_1_0 getSession()
-    {
-        ReceivingLinkAttachment attachment = _attachment;
-        return attachment == null ? null : attachment.getSession();
+        return _linkEndpoint;
     }
 
-    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    public void setLinkAttachment(final ReceivingLinkEndpoint linkEndpoint)
     {
-        if(Boolean.TRUE.equals(settled))
-        {
-            _unsettledMap.remove(deliveryTag);
-        }
+        _linkEndpoint = linkEndpoint;
+        ((StandardReceivingLinkEndpoint)getEndpoint()).doLinkAttachment();
     }
 
-    public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
+    @Override
+    public void setLinkAttachmentToNull()
     {
-        _attachment = linkAttachment;
-        _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
-        ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
-        Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-
-        Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
-        for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
-        {
-            Binary deliveryTag = entry.getKey();
-            if(!initialUnsettledMap.containsKey(deliveryTag))
-            {
-                _unsettledMap.remove(deliveryTag);
-            }
-        }
-
+        _linkEndpoint = null;
     }
 
-    public Map getUnsettledOutcomeMap()
-    {
-        return _unsettledMap;
-    }
-
-    public ReceivingDestination getDestination()
-    {
-        return _destination;
-    }
 }

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java&r1=1783242&r2=1783244&rev=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,8 +15,8 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
+
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.ArrayList;
@@ -25,13 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -40,43 +33,42 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
+public class TxnCoordinatorReceivingLinkEndpoint extends ReceivingLinkEndpoint
 {
-    private static final Logger _logger = LoggerFactory.getLogger(TxnCoordinatorReceivingLink_1_0.class);
-    private NamedAddressSpace _namedAddressSpace;
-    private ReceivingLinkEndpoint _endpoint;
-
-    private ArrayList<Transfer> _incompleteMessage;
-    private SectionDecoder _sectionDecoder;
+    private final SectionDecoderImpl _sectionDecoder;
     private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
-    private Session_1_0 _session;
+    private ArrayList<Transfer> _incompleteMessage;
+
+    public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session,
+                                               final Attach attach)
+    {
+        super(session, attach);
+        _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
 
+    }
 
-    public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
-                                           Session_1_0 session_1_0,
-                                           ReceivingLinkEndpoint endpoint)
+    @Override
+    public void start()
     {
-        _namedAddressSpace = namedAddressSpace;
-        _session = session_1_0;
-        _endpoint  = endpoint;
-        _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
-        ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+        setLinkCredit(UnsignedInteger.ONE);
+        setCreditWindow();
     }
 
-    public Error messageTransfer(Transfer xfr)
+    @Override
+    protected Error messageTransfer(Transfer xfr)
     {
         List<QpidByteBuffer> payload = new ArrayList<>();
 
@@ -96,13 +88,11 @@ public class TxnCoordinatorReceivingLink
                 return null;
             }
 
-            int size = 0;
             for(Transfer t : _incompleteMessage)
             {
                 final List<QpidByteBuffer> bufs = t.getPayload();
                 if(bufs != null)
                 {
-                    size += QpidByteBufferUtils.remaining(bufs);
                     payload.addAll(bufs);
                 }
                 t.dispose();
@@ -120,44 +110,51 @@ public class TxnCoordinatorReceivingLink
         try
         {
             List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload);
+            boolean amqpValueSectionFound = false;
             for(EncodingRetainingSection section : sections)
             {
                 if(section instanceof AmqpValueSection)
                 {
+                    if (amqpValueSectionFound)
+                    {
+                        throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections");
+                    }
+                    amqpValueSectionFound = true;
                     Object command = section.getValue();
 
-
                     if(command instanceof Declare)
                     {
-                        final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
+                        final IdentifiedTransaction txn = getSession().getConnection().createLocalTransaction();
                         _createdTransactions.put(txn.getId(), txn.getServerTransaction());
 
                         Declared state = new Declared();
 
-                        _session.incrementStartedTransactions();
+                        getSession().incrementStartedTransactions();
 
-                        state.setTxnId(_session.integerToBinary(txn.getId()));
-                        _endpoint.updateDisposition(deliveryTag, state, true);
+                        state.setTxnId(getSession().integerToBinary(txn.getId()));
+                        updateDisposition(deliveryTag, state, true);
 
                     }
                     else if(command instanceof Discharge)
                     {
                         Discharge discharge = (Discharge) command;
 
-                        final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()),
+                        final Error error = discharge(getSession().binaryToInteger(discharge.getTxnId()),
                                                       Boolean.TRUE.equals(discharge.getFail()));
-                        _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
+                        updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
                         return error;
                     }
                     else
                     {
-                        // TODO error handling
-
-                        // also should panic if we receive more than one AmqpValue, or no AmqpValue section
+                        throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'",
+                                                                                 command.getClass().getSimpleName()));
                     }
                 }
             }
-
+            if (!amqpValueSectionFound)
+            {
+                throw new ConnectionScopedRuntimeException("Received no AmqpValue section");
+            }
         }
         catch (AmqpErrorException e)
         {
@@ -173,25 +170,6 @@ public class TxnCoordinatorReceivingLink
         return null;
     }
 
-    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
-    {
-        // force rollback of open transactions
-        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
-        {
-            entry.getValue().rollback();
-            _session.incrementRolledBackTransactions();
-            _session.getConnection().removeTransaction(entry.getKey());
-        }
-        endpoint.detach();
-    }
-
-
-    @Override
-    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
-    {
-
-    }
-
     private Error discharge(Integer transactionId, boolean fail)
     {
         Error error = null;
@@ -201,23 +179,23 @@ public class TxnCoordinatorReceivingLink
             if(fail)
             {
                 txn.rollback();
-                _session.incrementRolledBackTransactions();
+                getSession().incrementRolledBackTransactions();
             }
             else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
             {
                 txn.commit();
-                _session.incrementCommittedTransactions();
+                getSession().incrementCommittedTransactions();
             }
             else
             {
                 txn.rollback();
-                _session.incrementRolledBackTransactions();
+                getSession().incrementRolledBackTransactions();
                 error = new Error();
                 error.setCondition(LinkError.DETACH_FORCED);
                 error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
-            _session.getConnection().removeTransaction(transactionId);
+            getSession().getConnection().removeTransaction(transactionId);
         }
         else
         {
@@ -228,11 +206,24 @@ public class TxnCoordinatorReceivingLink
         return error;
     }
 
+    @Override
+    protected void remoteDetachedPerformDetach(Detach detach)
+    {
+        // force rollback of open transactions
+        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
+        {
+            entry.getValue().rollback();
+            getSession().incrementRolledBackTransactions();
+            getSession().getConnection().removeTransaction(entry.getKey());
+        }
+        detach();
+    }
 
-
-    public void start()
+    @Override
+    protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
-        _endpoint.setLinkCredit(UnsignedInteger.ONE);
-        _endpoint.setCreditWindow();
+
     }
+
+
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,219 +20,20 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
 
 public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
 {
-    private static final Logger _logger = LoggerFactory.getLogger(TxnCoordinatorReceivingLink_1_0.class);
-    private NamedAddressSpace _namedAddressSpace;
-    private ReceivingLinkEndpoint _endpoint;
-
-    private ArrayList<Transfer> _incompleteMessage;
-    private SectionDecoder _sectionDecoder;
-    private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
-    private Session_1_0 _session;
-
-
-    public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
-                                           Session_1_0 session_1_0,
-                                           ReceivingLinkEndpoint endpoint)
+    public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint)
     {
-        _namedAddressSpace = namedAddressSpace;
-        _session = session_1_0;
-        _endpoint  = endpoint;
-        _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
         ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
     }
 
-    public Error messageTransfer(Transfer xfr)
-    {
-        List<QpidByteBuffer> payload = new ArrayList<>();
-
-        final Binary deliveryTag = xfr.getDeliveryTag();
-
-        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
-        {
-            _incompleteMessage = new ArrayList<Transfer>();
-            _incompleteMessage.add(xfr);
-            return null;
-        }
-        else if(_incompleteMessage != null)
-        {
-            _incompleteMessage.add(xfr);
-            if(Boolean.TRUE.equals(xfr.getMore()))
-            {
-                return null;
-            }
-
-            int size = 0;
-            for(Transfer t : _incompleteMessage)
-            {
-                final List<QpidByteBuffer> bufs = t.getPayload();
-                if(bufs != null)
-                {
-                    size += QpidByteBufferUtils.remaining(bufs);
-                    payload.addAll(bufs);
-                }
-                t.dispose();
-            }
-            _incompleteMessage=null;
-
-        }
-        else
-        {
-            payload.addAll(xfr.getPayload());
-            xfr.dispose();
-        }
-
-        // Only interested in the amqp-value section that holds the message to the coordinator
-        try
-        {
-            List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload);
-            for(EncodingRetainingSection section : sections)
-            {
-                if(section instanceof AmqpValueSection)
-                {
-                    Object command = section.getValue();
-
-
-                    if(command instanceof Declare)
-                    {
-                        final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
-                        _createdTransactions.put(txn.getId(), txn.getServerTransaction());
-
-                        Declared state = new Declared();
-
-                        _session.incrementStartedTransactions();
-
-                        state.setTxnId(_session.integerToBinary(txn.getId()));
-                        _endpoint.updateDisposition(deliveryTag, state, true);
-
-                    }
-                    else if(command instanceof Discharge)
-                    {
-                        Discharge discharge = (Discharge) command;
-
-                        final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()),
-                                                      Boolean.TRUE.equals(discharge.getFail()));
-                        _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
-                        return error;
-                    }
-                    else
-                    {
-                        // TODO error handling
-
-                        // also should panic if we receive more than one AmqpValue, or no AmqpValue section
-                    }
-                }
-            }
-
-        }
-        catch (AmqpErrorException e)
-        {
-            return e.getError();
-        }
-        finally
-        {
-            for(QpidByteBuffer buf : payload)
-            {
-                buf.dispose();
-            }
-        }
-        return null;
-    }
-
-    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
-    {
-        // force rollback of open transactions
-        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
-        {
-            entry.getValue().rollback();
-            _session.incrementRolledBackTransactions();
-            _session.getConnection().removeTransaction(entry.getKey());
-        }
-        endpoint.detach();
-    }
-
 
     @Override
-    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    public void setLinkAttachmentToNull()
     {
-
-    }
-
-    private Error discharge(Integer transactionId, boolean fail)
-    {
-        Error error = null;
-        ServerTransaction txn = _createdTransactions.get(transactionId);
-        if(txn != null)
-        {
-            if(fail)
-            {
-                txn.rollback();
-                _session.incrementRolledBackTransactions();
-            }
-            else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
-            {
-                txn.commit();
-                _session.incrementCommittedTransactions();
-            }
-            else
-            {
-                txn.rollback();
-                _session.incrementRolledBackTransactions();
-                error = new Error();
-                error.setCondition(LinkError.DETACH_FORCED);
-                error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
-            }
-            _createdTransactions.remove(transactionId);
-            _session.getConnection().removeTransaction(transactionId);
-        }
-        else
-        {
-            error = new Error();
-            error.setCondition(AmqpError.NOT_FOUND);
-            error.setDescription("Unknown transactionId" + transactionId);
-        }
-        return error;
     }
 
-
-
-    public void start()
-    {
-        _endpoint.setLinkCredit(UnsignedInteger.ONE);
-        _endpoint.setCreditWindow();
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org