You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/01/05 17:19:34 UTC

svn commit: r1777483 - /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: rgodfrey
Date: Thu Jan  5 17:19:33 2017
New Revision: 1777483

URL: http://svn.apache.org/viewvc?rev=1777483&view=rev
Log:
QPID-7568 : Address review comments by [~k-wall]

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Thu Jan  5 17:19:33 2017
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
@@ -134,6 +135,12 @@ public class ExchangeDestination impleme
     }
 
     @Override
+    public MessageDestination getMessageDestination()
+    {
+        return _exchange;
+    }
+
+    @Override
     public void authorizePublish(final SecurityToken securityToken,
                                  final String routingAddress)
     {
@@ -232,9 +239,8 @@ public class ExchangeDestination impleme
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[2];
+        Symbol[] capabilities = new Symbol[1];
         capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
-        capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Thu Jan  5 17:19:33 2017
@@ -21,15 +21,18 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -52,6 +55,7 @@ public abstract class LinkEndpoint<T ext
     private UnsignedInteger _lastSentCreditLimit;
     private volatile boolean _stopped;
     private volatile boolean _stoppedUpdated;
+    private Symbol[] _capabilities;
 
     private enum State
     {
@@ -303,6 +307,7 @@ public abstract class LinkEndpoint<T ext
         attachToSend.setRcvSettleMode(getReceivingSettlementMode());
         attachToSend.setUnsettled(_localUnsettled);
         attachToSend.setProperties(_properties);
+        attachToSend.setOfferedCapabilities(_capabilities);
 
         if (getRole() == Role.SENDER)
         {
@@ -488,6 +493,16 @@ public abstract class LinkEndpoint<T ext
         _receivingSettlementMode = receivingSettlementMode;
     }
 
+    public List<Symbol> getCapabilities()
+    {
+        return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
+    }
+
+    public void setCapabilities(Collection<Symbol> capabilities)
+    {
+        _capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
+    }
+
     public Map getInitialUnsettledMap()
     {
         return _initialUnsettledMap;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Thu Jan  5 17:19:33 2017
@@ -131,6 +131,12 @@ public class NodeReceivingDestination im
     }
 
     @Override
+    public MessageDestination getMessageDestination()
+    {
+        return _destination;
+    }
+
+    @Override
     public void authorizePublish(final SecurityToken securityToken,
                                  final String routingAddress)
     {
@@ -203,9 +209,8 @@ public class NodeReceivingDestination im
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[2];
+        Symbol[] capabilities = new Symbol[1];
         capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
-        capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Thu Jan  5 17:19:33 2017
@@ -20,17 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -124,13 +121,8 @@ public class QueueDestination extends Me
     }
 
     @Override
-    public Symbol[] getCapabilities()
+    public MessageDestination getMessageDestination()
     {
-        Set<Symbol> capabilities = new HashSet<>(Arrays.asList(super.getCapabilities()));
-        if(_queue.isHoldOnPublishEnabled())
-        {
-            capabilities.add(DELAYED_DELIVERY);
-        }
-        return capabilities.toArray(new Symbol[capabilities.size()]);
+        return _queue;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Thu Jan  5 17:19:33 2017
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -34,7 +35,6 @@ public interface ReceivingDestination ex
 
     Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
     Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
-    Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
 
 
     Outcome[] getOutcomes();
@@ -51,4 +51,6 @@ public interface ReceivingDestination ex
     String getAddress();
 
     void authorizePublish(SecurityToken securityToken, final String routingAddress);
+
+    MessageDestination getMessageDestination();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jan  5 17:19:33 2017
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -114,6 +115,7 @@ import org.apache.qpid.transport.network
 
 public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
 {
+    public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
     private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private static final EnumSet<SessionState> END_STATES =
@@ -776,7 +778,7 @@ public class Session_1_0 implements AMQS
         Error error = null;
 
         final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-
+        Set<Symbol> capabilities = new HashSet<>();
 
         if (endpoint.getRole() == Role.SENDER)
         {
@@ -910,175 +912,176 @@ public class Session_1_0 implements AMQS
 
             }
         }
-        else
+        else if (endpoint.getTarget() instanceof Coordinator)
         {
-            if (endpoint.getTarget() instanceof Coordinator)
+            Coordinator coordinator = (Coordinator) endpoint.getTarget();
+            TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
+            boolean localTxn = false;
+            boolean multiplePerSession = false;
+            if (coordinatorCapabilities != null)
             {
-                Coordinator coordinator = (Coordinator) endpoint.getTarget();
-                TxnCapability[] capabilities = coordinator.getCapabilities();
-                boolean localTxn = false;
-                boolean multiplePerSession = false;
-                if (capabilities != null)
+                for (TxnCapability capability : coordinatorCapabilities)
                 {
-                    for (TxnCapability capability : capabilities)
+                    if (capability.equals(TxnCapability.LOCAL_TXN))
                     {
-                        if (capability.equals(TxnCapability.LOCAL_TXN))
-                        {
-                            localTxn = true;
-                        }
-                        else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
-                        {
-                            multiplePerSession = true;
-                        }
-                        else
-                        {
-                            error = new Error();
-                            error.setCondition(AmqpError.NOT_IMPLEMENTED);
-                            error.setDescription("Unsupported capability: " + capability);
-                            break;
-                        }
+                        localTxn = true;
+                    }
+                    else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+                    {
+                        multiplePerSession = true;
+                    }
+                    else
+                    {
+                        error = new Error();
+                        error.setCondition(AmqpError.NOT_IMPLEMENTED);
+                        error.setDescription("Unsupported capability: " + capability);
+                        break;
                     }
                 }
+            }
 
        /*         if(!localTxn)
                 {
-                    capabilities.add(TxnCapabilities.LOCAL_TXN);
+                    coordinatorCapabilities.add(TxnCapabilities.LOCAL_TXN);
                 }*/
 
-                final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
-                        new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
-                                                            this,
-                                                            receivingLinkEndpoint,
-                                                            _openTransactions);
-                receivingLinkEndpoint.setLink(coordinatorLink);
-                link = coordinatorLink;
+            final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+            final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
+                    new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
+                                                        this,
+                                                        receivingLinkEndpoint,
+                                                        _openTransactions);
+            receivingLinkEndpoint.setLink(coordinatorLink);
+            link = coordinatorLink;
+        }
+        else // standard  (non-Coordinator) receiver
+        {
 
+            StandardReceivingLink_1_0 previousLink =
+                    (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
 
-            }
-            else
+            if (previousLink == null)
             {
 
-                StandardReceivingLink_1_0 previousLink =
-                        (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
+                Target target = (Target) endpoint.getTarget();
 
-                if (previousLink == null)
+                if (target != null)
                 {
+                    if (Boolean.TRUE.equals(target.getDynamic()))
+                    {
 
-                    Target target = (Target) endpoint.getTarget();
+                        MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
+                        target.setAddress(tempQueue.getName());
+                    }
 
-                    if (target != null)
+                    String addr = target.getAddress();
+                    if (addr == null || "".equals(addr.trim()))
                     {
-                        if (Boolean.TRUE.equals(target.getDynamic()))
-                        {
+                        MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
+                        destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                                   target.getExpiryPolicy(), "",
+                                                                   target.getCapabilities(),
+                                                                   _connection.getEventLogger());
+                        target.setCapabilities(destination.getCapabilities());
 
-                            MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
-                            target.setAddress(tempQueue.getName());
+                        if (_blockingEntities.contains(messageDestination))
+                        {
+                            endpoint.setStopped(true);
                         }
-
-                        String addr = target.getAddress();
-                        if (addr == null || "".equals(addr.trim()))
+                    }
+                    else if (!addr.startsWith("/") && addr.contains("/"))
+                    {
+                        String[] parts = addr.split("/", 2);
+                        Exchange<?> exchange = getExchange(parts[0]);
+                        if (exchange != null)
                         {
-                            MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
-                            destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                       target.getExpiryPolicy(), "",
-                                                                       target.getCapabilities(),
-                                                                       _connection.getEventLogger());
-                            target.setCapabilities(destination.getCapabilities());
-
-                            if(_blockingEntities.contains(messageDestination))
-                            {
-                                endpoint.setStopped(true);
-                            }
+                            ExchangeDestination exchangeDestination =
+                                    new ExchangeDestination(exchange,
+                                                            target.getDurable(),
+                                                            target.getExpiryPolicy(),
+                                                            parts[0],
+                                                            target.getCapabilities());
 
+                            exchangeDestination.setInitialRoutingAddress(parts[1]);
+                            target.setCapabilities(exchangeDestination.getCapabilities());
+                            destination = exchangeDestination;
                         }
-                        else if (!addr.startsWith("/") && addr.contains("/"))
+                        else
                         {
-                            String[] parts = addr.split("/", 2);
-                            Exchange<?> exchange =getExchange(parts[0]);
-                            if (exchange != null)
-                            {
-                                ExchangeDestination exchangeDestination =
-                                        new ExchangeDestination(exchange,
-                                                                target.getDurable(),
-                                                                target.getExpiryPolicy(),
-                                                                parts[0],
-                                                                target.getCapabilities());
-
-                                exchangeDestination.setInitialRoutingAddress(parts[1]);
-                                target.setCapabilities(exchangeDestination.getCapabilities());
-                                destination = exchangeDestination;
-
-                            }
-                            else
-                            {
-                                endpoint.setTarget(null);
-                                destination = null;
-                            }
+                            endpoint.setTarget(null);
+                            destination = null;
+                        }
+                    }
+                    else
+                    {
+                        MessageDestination messageDestination =
+                                getAddressSpace().getAttainedMessageDestination(addr);
+                        if (messageDestination != null)
+                        {
+                            destination =
+                                    new NodeReceivingDestination(messageDestination,
+                                                                 target.getDurable(),
+                                                                 target.getExpiryPolicy(),
+                                                                 addr,
+                                                                 target.getCapabilities(),
+                                                                 _connection.getEventLogger());
+                            target.setCapabilities(destination.getCapabilities());
                         }
                         else
                         {
-                            MessageDestination messageDestination =
-                                    getAddressSpace().getAttainedMessageDestination(addr);
-                            if (messageDestination != null)
+                            Queue<?> queue = getQueue(addr);
+                            if (queue != null)
                             {
-                                destination =
-                                        new NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                     target.getExpiryPolicy(), addr, target.getCapabilities(), _connection.getEventLogger());
-                                target.setCapabilities(destination.getCapabilities());
 
+                                destination = new QueueDestination(queue, addr);
                             }
                             else
                             {
-                                Queue<?> queue = getQueue(addr);
-                                if (queue != null)
-                                {
-
-                                    destination = new QueueDestination(queue, addr);
-                                }
-                                else
-                                {
-                                    endpoint.setTarget(null);
-                                    destination = null;
-                                }
-
+                                endpoint.setTarget(null);
+                                destination = null;
                             }
                         }
-
                     }
-                    else
+                }
+                else
+                {
+                    destination = null;
+                }
+                if (destination != null)
+                {
+                    final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+                    MessageDestination messageDestination = receivingDestination.getMessageDestination();
+                    if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
                     {
-                        destination = null;
+                        capabilities.add(DELAYED_DELIVERY);
                     }
-                    if (destination != null)
+                    final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                    final StandardReceivingLink_1_0 receivingLink =
+                            new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+                                                          getAddressSpace(),
+                                                          receivingDestination);
+
+                    receivingLinkEndpoint.setLink(receivingLink);
+                    link = receivingLink;
+                    if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+                        || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
                     {
-                        final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                        final StandardReceivingLink_1_0 receivingLink =
-                                new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
-                                                              getAddressSpace(),
-                                                              (ReceivingDestination) destination);
-
-                        receivingLinkEndpoint.setLink(receivingLink);
-                        link = receivingLink;
-                        if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
-                            || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
-                        {
-                            linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
-                        }
+                        linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
                     }
                 }
-                else
-                {
-                    ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                    previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
-                    receivingLinkEndpoint.setLink(previousLink);
-                    link = previousLink;
-                    endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
-
-                }
+            }
+            else
+            {
+                ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+                receivingLinkEndpoint.setLink(previousLink);
+                link = previousLink;
+                endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
             }
         }
 
+
+        endpoint.setCapabilities(capabilities);
         endpoint.attach();
 
         if (link == null)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org