You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/01/24 10:14:48 UTC

svn commit: r1780052 [2/3] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/src/main/java/org/...

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1780052&r1=1780051&r2=1780052&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Jan 24 10:14:48 2017
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.protocol.v1_0.ExchangeDestination.SHARED_CAPABILITY;
 
 import java.security.AccessControlContext;
 import java.security.AccessControlException;
@@ -28,6 +29,7 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -49,6 +51,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.EventLogger;
@@ -62,13 +66,17 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
@@ -87,9 +95,16 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -111,6 +126,7 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.transport.network.Ticker;
 
 public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
@@ -291,8 +307,7 @@ public class Session_1_0 implements AMQS
                     endpoint.setLocalHandle(localHandle);
                     _localLinkEndpoints.put(endpoint, localHandle);
 
-                    remoteLinkCreation(endpoint);
-
+                    remoteLinkCreation(endpoint, attach);
                 }
                 else
                 {
@@ -771,332 +786,647 @@ public class Session_1_0 implements AMQS
         return _accessControllerContext;
     }
 
-    public void remoteLinkCreation(final LinkEndpoint endpoint)
+    public void remoteLinkCreation(final LinkEndpoint endpoint, Attach attach)
     {
-        Destination destination;
         Link_1_0 link = null;
         Error error = null;
+        Set<Symbol> capabilities = new HashSet<>();
+        try
+        {
+            if (endpoint.getRole() == Role.SENDER)
+            {
+                link = createSendingLink(endpoint, attach);
+                if (link != null)
+                {
+                    capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
+                }
+            }
+            else if (endpoint.getTarget() instanceof Coordinator)
+            {
+                link = createCoordinatorLink(endpoint);
+            }
+            else // standard  (non-Coordinator) receiver
+            {
+                link = createReceivingLink(endpoint, capabilities);
+            }
+        }
+        catch (AmqpErrorException e)
+        {
+
+            if (e.getError() == null || e.getError().getCondition() == AmqpError.INTERNAL_ERROR)
+            {
+                _logger.error("Could not create link", e);
+            }
+            else
+            {
+                _logger.debug("Could not create link", e);
+            }
+            if (endpoint.getRole() == Role.SENDER)
+            {
+                endpoint.setSource(null);
+            }
+            else
+            {
+                endpoint.setTarget(null);
+            }
+            error = e.getError();
+        }
+
+        endpoint.setCapabilities(capabilities);
+        endpoint.attach();
 
+        if (link == null)
+        {
+            if (error == null)
+            {
+                error = new Error();
+                error.setCondition(AmqpError.NOT_FOUND);
+            }
+            endpoint.close(error);
+        }
+        else
+        {
+            link.start();
+        }
+    }
+
+    private Link_1_0 createReceivingLink(final LinkEndpoint endpoint,
+                                         final Set<Symbol> capabilities)
+    {
+        Link_1_0 link = null;
+        Destination destination;
         final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-        Set<Symbol> capabilities = new HashSet<>();
+        StandardReceivingLink_1_0 previousLink =
+                (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
 
-        if (endpoint.getRole() == Role.SENDER)
+        if (previousLink == null)
         {
 
-            final SendingLink_1_0 previousLink =
-                    (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+            Target target = (Target) endpoint.getTarget();
 
-            if (previousLink == null)
+            if (target != null)
             {
+                if (Boolean.TRUE.equals(target.getDynamic()))
+                {
 
-                Target target = (Target) endpoint.getTarget();
-                Source source = (Source) endpoint.getSource();
-
+                    MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
+                    target.setAddress(tempQueue.getName());
+                }
 
-                if (source != null)
+                String addr = target.getAddress();
+                if (addr == null || "".equals(addr.trim()))
                 {
-                    if (Boolean.TRUE.equals(source.getDynamic()))
+                    MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
+                    destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                               target.getExpiryPolicy(), "",
+                                                               target.getCapabilities(),
+                                                               _connection.getEventLogger());
+                    target.setCapabilities(destination.getCapabilities());
+
+                    if (_blockingEntities.contains(messageDestination))
                     {
-                        MessageSource tempQueue = createDynamicSource(source.getDynamicNodeProperties());
-                        source.setAddress(tempQueue.getName());
+                        endpoint.setStopped(true);
+                    }
+                }
+                else if (!addr.startsWith("/") && addr.contains("/"))
+                {
+                    String[] parts = addr.split("/", 2);
+                    Exchange<?> exchange = getExchange(parts[0]);
+                    if (exchange != null)
+                    {
+                        Symbol[] capabilities1 = target.getCapabilities();
+                        ExchangeDestination exchangeDestination = new ExchangeDestination(exchange,
+                                                                                          null,
+                                                                                          target.getDurable(),
+                                                                                          target.getExpiryPolicy(),
+                                                                                          parts[0],
+                                                                                          parts[1],
+                                                                                          capabilities1 != null ? Arrays.asList(capabilities1) : Collections.<Symbol>emptyList());
+                        target.setCapabilities(exchangeDestination.getCapabilities());
+                        destination = exchangeDestination;
                     }
-                    String addr = source.getAddress();
-                    if (!addr.startsWith("/") && addr.contains("/"))
+                    else
                     {
-                        String[] parts = addr.split("/", 2);
-                        Exchange<?> exchg = getExchange(parts[0]);
-                        if (exchg != null)
-                        {
-                            ExchangeDestination exchangeDestination =
-                                    new ExchangeDestination(exchg,
-                                                            source.getDurable(),
-                                                            source.getExpiryPolicy(),
-                                                            parts[0],
-                                                            target.getCapabilities());
-                            exchangeDestination.setInitialRoutingAddress(parts[1]);
-                            destination = exchangeDestination;
-                            target.setCapabilities(exchangeDestination.getCapabilities());
-                        }
-                        else
-                        {
-                            endpoint.setSource(null);
-                            destination = null;
-                        }
+                        endpoint.setTarget(null);
+                        destination = null;
+                    }
+                }
+                else
+                {
+                    MessageDestination messageDestination =
+                            getAddressSpace().getAttainedMessageDestination(addr);
+                    if (messageDestination != null)
+                    {
+                        destination =
+                                new NodeReceivingDestination(messageDestination,
+                                                             target.getDurable(),
+                                                             target.getExpiryPolicy(),
+                                                             addr,
+                                                             target.getCapabilities(),
+                                                             _connection.getEventLogger());
+                        target.setCapabilities(destination.getCapabilities());
                     }
                     else
                     {
-                        MessageSource queue = getAddressSpace().getAttainedMessageSource(addr);
+                        Queue<?> queue = getQueue(addr);
                         if (queue != null)
                         {
-                            destination = new MessageSourceDestination(queue);
+
+                            destination = new QueueDestination(queue, addr);
                         }
                         else
                         {
-                            Exchange<?> exchg = getExchange(addr);
-                            if (exchg != null)
-                            {
-                                ExchangeDestination exchangeDestination =
-                                              new ExchangeDestination(exchg,
-                                                                      source.getDurable(),
-                                                                      source.getExpiryPolicy(),
-                                                                      addr,
-                                                                      target.getCapabilities());
-                                destination = exchangeDestination;
-                                target.setCapabilities(exchangeDestination.getCapabilities());
-                            }
-                            else
-                            {
-                                endpoint.setSource(null);
-                                destination = null;
-                            }
+                            endpoint.setTarget(null);
+                            destination = null;
                         }
                     }
+                }
+            }
+            else
+            {
+                destination = null;
+            }
+            if (destination != null)
+            {
+                final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+                MessageDestination messageDestination = receivingDestination.getMessageDestination();
+                if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
+                {
+                    capabilities.add(DELAYED_DELIVERY);
+                }
+                final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                final StandardReceivingLink_1_0 receivingLink =
+                        new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+                                                      getAddressSpace(),
+                                                      receivingDestination);
+
+                receivingLinkEndpoint.setLink(receivingLink);
+                link = receivingLink;
+                if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+                    || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
+                {
+                    linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
+                }
+            }
+        }
+        else
+        {
+            ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+            previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+            receivingLinkEndpoint.setLink(previousLink);
+            link = previousLink;
+            endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+        }
+        return link;
+    }
 
+    private TxnCoordinatorReceivingLink_1_0 createCoordinatorLink(final LinkEndpoint endpoint) throws AmqpErrorException
+    {
+        Coordinator coordinator = (Coordinator) endpoint.getTarget();
+        TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
+        boolean localTxn = false;
+        boolean multiplePerSession = false;
+        if (coordinatorCapabilities != null)
+        {
+            for (TxnCapability capability : coordinatorCapabilities)
+            {
+                if (capability.equals(TxnCapability.LOCAL_TXN))
+                {
+                    localTxn = true;
+                }
+                else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+                {
+                    multiplePerSession = true;
                 }
                 else
                 {
-                    destination = null;
+                    Error error = new Error();
+                    error.setCondition(AmqpError.NOT_IMPLEMENTED);
+                    error.setDescription("Unsupported capability: " + capability);
+                    throw new AmqpErrorException(error);
                 }
+            }
+        }
+
+        final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+        final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
+                new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
+                                                    this,
+                                                    receivingLinkEndpoint,
+                                                    _openTransactions);
+        receivingLinkEndpoint.setLink(coordinatorLink);
+        return coordinatorLink;
+    }
 
-                if (destination != null)
+    private SendingLink_1_0 createSendingLink(final LinkEndpoint endpoint, Attach attach) throws AmqpErrorException
+    {
+        final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+        SendingLink_1_0 link = null;
+        final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
+        final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+
+        if (previousLink == null)
+        {
+            Source source = (Source) sendingLinkEndpoint.getSource();
+            SendingDestination destination = null;
+            if (source != null)
+            {
+                destination = getSendingDestination(sendingLinkEndpoint.getName(), source);
+                if (destination == null)
+                {
+                    sendingLinkEndpoint.setSource(null);
+                }
+                else
+                {
+                    source.setCapabilities(destination.getCapabilities());
+                }
+            }
+            else
+            {
+                final Symbol[] linkCapabilities = attach.getDesiredCapabilities();
+                boolean isGlobal = hasCapability(linkCapabilities, ExchangeDestination.GLOBAL_CAPABILITY);
+                final String queueName = getMangledSubscriptionName(endpoint.getName(), true, true, isGlobal);
+                final MessageSource messageSource = getAddressSpace().getAttainedMessageSource(queueName);
+                // TODO START The Source should be persisted on the LinkEndpoint
+                if (messageSource instanceof Queue)
                 {
-                    final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
-                    try
+                    Queue<?> queue = (Queue<?>) messageSource;
+                    source = new Source();
+                    List<Symbol> capabilities = new ArrayList<>();
+                    if (queue.getExclusive() == ExclusivityPolicy.SHARED_SUBSCRIPTION)
                     {
-                        final SendingLink_1_0 sendingLink =
-                                new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
-                                                    getAddressSpace(),
-                                                    (SendingDestination) destination
-                                );
-
-                        sendingLinkEndpoint.setLink(sendingLink);
-                        registerConsumer(sendingLink);
+                        capabilities.add(ExchangeDestination.SHARED_CAPABILITY);
+                    }
+                    if (isGlobal)
+                    {
+                        capabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
+                    }
+                    capabilities.add(ExchangeDestination.TOPIC_CAPABILITY);
+                    source.setCapabilities(capabilities.toArray(new Symbol[capabilities.size()]));
 
-                        link = sendingLink;
-                        if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
-                            || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+                    final Collection<Exchange> exchanges = queue.getVirtualHost().getChildren(Exchange.class);
+                    String bindingKey = null;
+                    Exchange<?> foundExchange = null;
+                    for (Exchange<?> exchange : exchanges)
+                    {
+                        for (Binding binding : exchange.getPublishingLinks(queue))
+                        {
+                            String exchangeName = exchange.getName();
+                            bindingKey = binding.getName();
+                            final Map<String, Object> bindingArguments = binding.getArguments();
+                            Map<Symbol, Filter> filter = new HashMap<>();
+                            if (bindingArguments.containsKey(AMQPFilterTypes.JMS_SELECTOR))
+                            {
+                                filter.put(Symbol.getSymbol("jms-selector"), new JMSSelectorFilter((String) bindingArguments.get(AMQPFilterTypes.JMS_SELECTOR)));
+                            }
+                            if (bindingArguments.containsKey(AMQPFilterTypes.NO_LOCAL))
+                            {
+                                filter.put(Symbol.getSymbol("no-local"), NoLocalFilter.INSTANCE);
+                            }
+                            foundExchange = exchange;
+                            source.setAddress(exchangeName + "/" + bindingKey);
+                            source.setFilter(filter);
+                            break;
+                        }
+                        if (foundExchange != null)
                         {
-                            linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+                            break;
                         }
                     }
-                    catch (AmqpErrorException e)
+                    if (foundExchange != null)
                     {
-                        _logger.error("Error creating sending link", e);
-                        destination = null;
-                        sendingLinkEndpoint.setSource(null);
-                        error = e.getError();
+                        source.setDurable(TerminusDurability.CONFIGURATION);
+                        TerminusExpiryPolicy terminusExpiryPolicy;
+                        switch (queue.getLifetimePolicy())
+                        {
+                            case PERMANENT:
+                                terminusExpiryPolicy = TerminusExpiryPolicy.NEVER;
+                                break;
+                            case DELETE_ON_NO_LINKS:
+                            case DELETE_ON_NO_OUTBOUND_LINKS:
+                                terminusExpiryPolicy = TerminusExpiryPolicy.LINK_DETACH;
+                                break;
+                            case DELETE_ON_CONNECTION_CLOSE:
+                                terminusExpiryPolicy = TerminusExpiryPolicy.CONNECTION_CLOSE;
+                                break;
+                            case DELETE_ON_SESSION_END:
+                                terminusExpiryPolicy = TerminusExpiryPolicy.SESSION_END;
+                                break;
+                            default:
+                                throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "unexpected liftetime policy " + queue.getLifetimePolicy()));
+                        }
+                        sendingLinkEndpoint.setSource(source);
+                        destination = new ExchangeDestination(foundExchange,
+                                                              queue,
+                                                              TerminusDurability.CONFIGURATION,
+                                                              terminusExpiryPolicy,
+                                                              foundExchange.getName(),
+                                                              bindingKey,
+                                                              capabilities);
                     }
+
                 }
+                // TODO END
             }
-            else
+
+            if (destination != null)
             {
-                Source newSource = (Source) endpoint.getSource();
+                final SendingLink_1_0 sendingLink =
+                        new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+                                            getAddressSpace(),
+                                            destination);
+                sendingLink.createConsumerTarget();
+
+                sendingLinkEndpoint.setLink(sendingLink);
+                registerConsumer(sendingLink);
 
-                Source oldSource = (Source) previousLink.getEndpoint().getSource();
-                final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
-                if (newSourceDurable != null)
+                if (destination instanceof ExchangeDestination)
                 {
-                    oldSource.setDurable(newSourceDurable);
-                    if (newSourceDurable.equals(TerminusDurability.NONE))
-                    {
-                        linkRegistry.unregisterSendingLink(endpoint.getName());
-                    }
+                    ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+                    exchangeDestination.getQueue().setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE, State.ACTIVE));
+                }
+
+                link = sendingLink;
+                if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
+                    || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+                {
+                    linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
                 }
-                endpoint.setSource(oldSource);
-                SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
-                previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
-                sendingLinkEndpoint.setLink(previousLink);
-                link = previousLink;
-                endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
-                registerConsumer(previousLink);
 
             }
         }
-        else if (endpoint.getTarget() instanceof Coordinator)
+        else
         {
-            Coordinator coordinator = (Coordinator) endpoint.getTarget();
-            TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
-            boolean localTxn = false;
-            boolean multiplePerSession = false;
-            if (coordinatorCapabilities != null)
+            Source newSource = (Source) attach.getSource();
+            Source oldSource = (Source) previousLink.getEndpoint().getSource();
+
+            if (previousLink.getDestination() instanceof ExchangeDestination && newSource != null && !Boolean.TRUE.equals(newSource.getDynamic()))
             {
-                for (TxnCapability capability : coordinatorCapabilities)
+                final SendingDestination newDestination = getSendingDestination(previousLink.getEndpoint().getName(), newSource);
+                if (updateSourceForSubscription(previousLink, newSource, newDestination))
                 {
-                    if (capability.equals(TxnCapability.LOCAL_TXN))
-                    {
-                        localTxn = true;
-                    }
-                    else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
-                    {
-                        multiplePerSession = true;
-                    }
-                    else
-                    {
-                        error = new Error();
-                        error.setCondition(AmqpError.NOT_IMPLEMENTED);
-                        error.setDescription("Unsupported capability: " + capability);
-                        break;
-                    }
+                    previousLink.setDestination(newDestination);
                 }
             }
 
-       /*         if(!localTxn)
+            sendingLinkEndpoint.setSource(oldSource);
+            previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+            sendingLinkEndpoint.setLink(previousLink);
+            link = previousLink;
+            sendingLinkEndpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+            registerConsumer(previousLink);
+
+        }
+        return link;
+    }
+
+    private boolean updateSourceForSubscription(final SendingLink_1_0 previousLink,
+                                                final Source newSource,
+                                                final SendingDestination newDestination)
+    {
+        SendingDestination oldDestination = previousLink.getDestination();
+        if (oldDestination instanceof ExchangeDestination)
+        {
+            ExchangeDestination oldExchangeDestination = (ExchangeDestination) oldDestination;
+            String newAddress = newSource.getAddress();
+            if (newDestination instanceof ExchangeDestination)
+            {
+                ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination;
+                if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
                 {
-                    coordinatorCapabilities.add(TxnCapabilities.LOCAL_TXN);
-                }*/
+                    Source oldSource = (Source) previousLink.getEndpoint().getSource();
+                    oldSource.setAddress(newAddress);
+                    oldSource.setFilter(newSource.getFilter());
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private SendingDestination getSendingDestination(final String linkName, final Source source) throws AmqpErrorException
+    {
+        SendingDestination destination = null;
 
-            final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-            final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
-                    new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
-                                                        this,
-                                                        receivingLinkEndpoint,
-                                                        _openTransactions);
-            receivingLinkEndpoint.setLink(coordinatorLink);
-            link = coordinatorLink;
+        if (Boolean.TRUE.equals(source.getDynamic()))
+        {
+            MessageSource tempQueue = createDynamicSource(source.getDynamicNodeProperties());
+            source.setAddress(tempQueue.getName()); // todo : temporary topic
         }
-        else // standard  (non-Coordinator) receiver
+
+        String address = source.getAddress();
+        if (address != null)
         {
+            if (!address.startsWith("/") && address.contains("/"))
+            {
+                destination = createExchangeDestination(address, linkName, source);
+            }
+            else
+            {
+                MessageSource queue = getAddressSpace().getAttainedMessageSource(address);
+                if (queue != null)
+                {
+                    destination = new MessageSourceDestination(queue);
+                }
+                else
+                {
+                    destination = createExchangeDestination(address, null, linkName, source);
+                }
+            }
+        }
+
+        return destination;
+    }
 
-            StandardReceivingLink_1_0 previousLink =
-                    (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
+    private ExchangeDestination createExchangeDestination(String address, final String linkName, final Source source)
+            throws AmqpErrorException
+    {
+        String[] parts = address.split("/", 2);
+        String exchangeName = parts[0];
+        String bindingKey = parts[1];
+        return createExchangeDestination(exchangeName, bindingKey, linkName, source);
+    }
 
-            if (previousLink == null)
+    private ExchangeDestination createExchangeDestination(final String exchangeName,
+                                                          final String bindingKey,
+                                                          final String linkName,
+                                                          final Source source) throws AmqpErrorException
+    {
+        ExchangeDestination exchangeDestination = null;
+        Exchange<?> exchange = getExchange(exchangeName);
+        List<Symbol> sourceCapabilities = new ArrayList<>();
+        if (exchange != null)
+        {
+            Queue queue = null;
+            if (!Boolean.TRUE.equals(source.getDynamic()))
             {
+                final Map<String, Object> attributes = new HashMap<>();
+                boolean isDurable = source.getDurable() != TerminusDurability.NONE;
+                boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
+                boolean isGlobal = hasCapability(source.getCapabilities(), ExchangeDestination.GLOBAL_CAPABILITY);
 
-                Target target = (Target) endpoint.getTarget();
+                final String name = getMangledSubscriptionName(linkName, isDurable, isShared, isGlobal);
 
-                if (target != null)
+                if (isGlobal)
                 {
-                    if (Boolean.TRUE.equals(target.getDynamic()))
-                    {
+                    sourceCapabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
+                }
 
-                        MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
-                        target.setAddress(tempQueue.getName());
-                    }
+                ExclusivityPolicy exclusivityPolicy;
+                if (isShared)
+                {
+                    exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
+                    sourceCapabilities.add(SHARED_CAPABILITY);
+                }
+                else
+                {
+                    exclusivityPolicy = ExclusivityPolicy.LINK;
+                }
 
-                    String addr = target.getAddress();
-                    if (addr == null || "".equals(addr.trim()))
-                    {
-                        MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
-                        destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
-                                                                   target.getExpiryPolicy(), "",
-                                                                   target.getCapabilities(),
-                                                                   _connection.getEventLogger());
-                        target.setCapabilities(destination.getCapabilities());
+                org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = getLifetimePolicy(source.getExpiryPolicy());
 
-                        if (_blockingEntities.contains(messageDestination))
-                        {
-                            endpoint.setStopped(true);
-                        }
-                    }
-                    else if (!addr.startsWith("/") && addr.contains("/"))
+                attributes.put(Queue.ID, UUID.randomUUID());
+                attributes.put(Queue.NAME, name);
+                attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+                attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+                attributes.put(Queue.DURABLE, isDurable);
+
+                BindingInfo bindingInfo = new BindingInfo(exchange, name,
+                                                          bindingKey, source.getFilter());
+                Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
+                try
+                {
+                    if (getAddressSpace() instanceof QueueManagingVirtualHost)
                     {
-                        String[] parts = addr.split("/", 2);
-                        Exchange<?> exchange = getExchange(parts[0]);
-                        if (exchange != null)
+                        try
                         {
-                            ExchangeDestination exchangeDestination =
-                                    new ExchangeDestination(exchange,
-                                                            target.getDurable(),
-                                                            target.getExpiryPolicy(),
-                                                            parts[0],
-                                                            target.getCapabilities());
-
-                            exchangeDestination.setInitialRoutingAddress(parts[1]);
-                            target.setCapabilities(exchangeDestination.getCapabilities());
-                            destination = exchangeDestination;
+                            queue = ((QueueManagingVirtualHost) getAddressSpace()).getSubscriptionQueue(exchangeName, attributes, bindings);
                         }
-                        else
+                        catch (NotFoundException e)
                         {
-                            endpoint.setTarget(null);
-                            destination = null;
+                            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
                         }
                     }
                     else
                     {
-                        MessageDestination messageDestination =
-                                getAddressSpace().getAttainedMessageDestination(addr);
-                        if (messageDestination != null)
-                        {
-                            destination =
-                                    new NodeReceivingDestination(messageDestination,
-                                                                 target.getDurable(),
-                                                                 target.getExpiryPolicy(),
-                                                                 addr,
-                                                                 target.getCapabilities(),
-                                                                 _connection.getEventLogger());
-                            target.setCapabilities(destination.getCapabilities());
-                        }
-                        else
-                        {
-                            Queue<?> queue = getQueue(addr);
-                            if (queue != null)
-                            {
-
-                                destination = new QueueDestination(queue, addr);
-                            }
-                            else
-                            {
-                                endpoint.setTarget(null);
-                                destination = null;
-                            }
-                        }
+                        throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
+                                                               "Address space of unexpected type"));
                     }
                 }
-                else
-                {
-                    destination = null;
-                }
-                if (destination != null)
+                catch(IllegalStateException e)
                 {
-                    final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
-                    MessageDestination messageDestination = receivingDestination.getMessageDestination();
-                    if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
-                    {
-                        capabilities.add(DELAYED_DELIVERY);
-                    }
-                    final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                    final StandardReceivingLink_1_0 receivingLink =
-                            new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
-                                                          getAddressSpace(),
-                                                          receivingDestination);
-
-                    receivingLinkEndpoint.setLink(receivingLink);
-                    link = receivingLink;
-                    if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
-                        || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
-                    {
-                        linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
-                    }
+                    throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED,
+                                                           "Subscription is already in use"));
                 }
+                source.setFilter(bindingInfo.getActualFilters().isEmpty() ? null : bindingInfo.getActualFilters());
+                source.setDistributionMode(StdDistMode.COPY);
+                exchangeDestination = new ExchangeDestination(exchange,
+                                                              queue,
+                                                              source.getDurable(),
+                                                              source.getExpiryPolicy(),
+                                                              exchangeName,
+                                                              bindingKey,
+                                                              sourceCapabilities);
             }
             else
             {
-                ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
-                receivingLinkEndpoint.setLink(previousLink);
-                link = previousLink;
-                endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+                // TODO
+                throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Temporary subscription is not implemented"));
             }
         }
+        return exchangeDestination;
+    }
 
+    private org.apache.qpid.server.model.LifetimePolicy getLifetimePolicy(final TerminusExpiryPolicy expiryPolicy) throws AmqpErrorException
+    {
+        org.apache.qpid.server.model.LifetimePolicy lifetimePolicy;
+        if (expiryPolicy == null || expiryPolicy == TerminusExpiryPolicy.SESSION_END)
+        {
+            lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_SESSION_END;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
+        {
+            lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
+        {
+            lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+        }
+        else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
+        {
+            lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
+        }
+        else
+        {
+            Error error = new Error(AmqpError.NOT_IMPLEMENTED,
+                                    String.format("unknown ExpiryPolicy '%s'", expiryPolicy.getValue()));
+            throw new AmqpErrorException(error);
+        }
+        return lifetimePolicy;
+    }
 
-        endpoint.setCapabilities(capabilities);
-        endpoint.attach();
+    private String getMangledSubscriptionName(final String linkName,
+                                              final boolean isDurable,
+                                              final boolean isShared,
+                                              final boolean isGlobal)
+    {
+        String remoteContainerId = getConnection().getRemoteContainerId();
+        if (isGlobal)
+        {
+            remoteContainerId = "_global_";
+        }
+        else
+        {
+            remoteContainerId = sanitizeName(remoteContainerId);
+        }
 
-        if (link == null)
+        String subscriptionName;
+        if (!isDurable && !isShared)
         {
-            if (error == null)
+            subscriptionName = UUID.randomUUID().toString();
+        }
+        else
+        {
+            subscriptionName = linkName;
+            if (isShared)
             {
-                error = new Error();
-                error.setCondition(AmqpError.NOT_FOUND);
+                int separator = subscriptionName.indexOf("|");
+                if (separator > 0)
+                {
+                    subscriptionName = subscriptionName.substring(0, separator);
+                }
             }
-            endpoint.close(error);
+            subscriptionName = sanitizeName(subscriptionName);
         }
-        else
+        return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + "_/" + (isDurable
+                ? "durable"
+                : "nondurable");
+    }
+
+    private String sanitizeName(String name)
+    {
+        return name.replace("_", "__")
+                   .replace(".", "_:")
+                   .replace("(", "_O")
+                   .replace(")", "_C")
+                   .replace("<", "_L")
+                   .replace(">", "_R");
+    }
+
+    private boolean hasCapability(final Symbol[] capabilities,
+                                  final Symbol expectedCapability)
+    {
+        if (capabilities != null)
         {
-            link.start();
+            for (Symbol capability : capabilities)
+            {
+                if (expectedCapability.equals(capability))
+                {
+                    return true;
+                }
+            }
         }
+        return false;
     }
 
 
@@ -1822,7 +2152,14 @@ public class Session_1_0 implements AMQS
                 {
                     if (link.getEndpoint() == linkEndpoint)
                     {
-                        link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+                        try
+                        {
+                            link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+                        }
+                        catch (AmqpErrorException e)
+                        {
+                            throw new ConnectionScopedRuntimeException(e);
+                        }
                     }
                 }
             }
@@ -1907,4 +2244,118 @@ public class Session_1_0 implements AMQS
             }
         }
     }
+
+    private final class BindingInfo
+    {
+        private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
+        private final Map<String, Map<String, Object>> _bindings = new HashMap<>();
+
+        private BindingInfo(Exchange<?> exchange,
+                            final String queueName,
+                            String bindingKey,
+                            Map<Symbol, Filter> filters)
+        {
+            String binding = null;
+            final Map<String, Object> arguments = new HashMap<>();
+            if (filters != null && !filters.isEmpty())
+            {
+                boolean hasBindingFilter = false;
+                boolean hasMessageFilter = false;
+                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+                {
+                    if(!hasBindingFilter
+                       && entry.getValue() instanceof ExactSubjectFilter
+                       && exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+                    {
+                        ExactSubjectFilter filter = (ExactSubjectFilter) entry.getValue();
+                        binding = filter.getValue();
+                        _actualFilters.put(entry.getKey(), filter);
+                        hasBindingFilter = true;
+                    }
+                    else if(!hasBindingFilter
+                            && entry.getValue() instanceof MatchingSubjectFilter
+                            && exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+                    {
+                        MatchingSubjectFilter filter = (MatchingSubjectFilter) entry.getValue();
+                        binding = filter.getValue();
+                        _actualFilters.put(entry.getKey(), filter);
+                        hasBindingFilter = true;
+                    }
+                    else if(entry.getValue() instanceof NoLocalFilter)
+                    {
+                        _actualFilters.put(entry.getKey(), entry.getValue());
+                        arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
+                    }
+                    else if(!hasMessageFilter && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+                    {
+                        org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+                        arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
+                        _actualFilters.put(entry.getKey(), selectorFilter);
+                        hasMessageFilter = true;
+                    }
+                }
+            }
+
+            if(binding != null)
+            {
+                _bindings.put(binding, arguments);
+            }
+            if(bindingKey != null)
+            {
+                _bindings.put(bindingKey, arguments);
+            }
+            if(binding == null
+               && bindingKey == null
+               && exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+            {
+                _bindings.put(queueName, arguments);
+            }
+            else if(binding == null
+                    && bindingKey == null
+                    && exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+            {
+                _bindings.put("#", arguments);
+            }
+        }
+
+        private Map<Symbol, Filter> getActualFilters()
+        {
+            return _actualFilters;
+        }
+
+        private Map<String, Map<String, Object>> getBindings()
+        {
+            return _bindings;
+        }
+
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final BindingInfo that = (BindingInfo) o;
+
+            if (!_actualFilters.equals(that._actualFilters))
+            {
+                return false;
+            }
+            return _bindings.equals(that._bindings);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = _actualFilters.hashCode();
+            result = 31 * result + _bindings.hashCode();
+            return result;
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java?rev=1780052&r1=1780051&r2=1780052&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java Tue Jan 24 10:14:48 2017
@@ -34,6 +34,12 @@ public class AmqpErrorException extends
         _error = error;
     }
 
+    public AmqpErrorException(final Error error, final Throwable cause)
+    {
+        super(cause);
+        _error = error;
+    }
+
     public AmqpErrorException(ErrorCondition condition)
     {
         _error = new Error();

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java?rev=1780052&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java Tue Jan 24 10:14:48 2017
@@ -0,0 +1,605 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import javax.security.auth.Subject;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class Session_1_0Test extends QpidTestCase
+{
+    private static final String TOPIC_NAME = "testTopic";
+    private static final String QUEUE_NAME = "testQueue";
+    private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
+    private static final Symbol QUEUE_CAPABILITY = Symbol.getSymbol("queue");
+    private AMQPConnection_1_0 _connection;
+    private VirtualHost<?> _virtualHost;
+    private Session_1_0 _session;
+    private int _handle;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _virtualHost = BrokerTestHelper.createVirtualHost("testVH");
+        _connection = createAmqpConnection_1_0("testContainerId");
+        this._session = createSession_1_0(_connection, 0);
+    }
+
+    public void testReceiveAttachTopicNonDurableNoContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(false, linkName, address, true);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+    }
+
+    public void testReceiveAttachTopicNonDurableWithContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(false, linkName, address, false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+    }
+
+    public void testReceiveAttachTopicDurableNoContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(true, linkName, address, true);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+    }
+
+    public void testReceiveAttachTopicDurableWithContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(true, linkName+ "|1", address, false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0("testContainerId2");
+        Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+        Attach attach2 = createTopicAttach(true, linkName + "|2", address, false);
+        secondSession.receiveAttach(attach2);
+
+        assertAttachSent(secondConnection, secondSession, attach2);
+        Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after second subscription with the same subscription name but different container id ", 2, queues.size());
+    }
+
+    public void testReceiveAttachSharedTopicNonDurableNoContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createSharedTopicAttach(false, linkName, address, true);
+        Attach attach2 = createSharedTopicAttach(false, linkName, address, true);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+        AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0();
+        Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+        secondSession.receiveAttach(attach2);
+
+        assertAttachSent(secondConnection, secondSession, attach2);
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+        final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after attach", 1, queues.size());
+        Queue queue = queues.iterator().next();
+
+        Collection<Consumer<?,?>> consumers = queue.getConsumers();
+        assertEquals("Unexpected number of consumers", 2, consumers.size());
+    }
+
+    public void testReceiveAttachSharedTopicNonDurableWithContainer() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createSharedTopicAttach(false, linkName, address, false);
+        Attach attach2 = createSharedTopicAttach(false, linkName, address, false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+        AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0("testContainerId2");
+        Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+        secondSession.receiveAttach(attach2);
+
+        assertAttachSent(secondConnection, secondSession, attach2);
+
+        final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after attach", 2, queues.size());
+
+        for (Queue queue : queues)
+        {
+            Collection<Consumer<?,?>> consumers = queue.getConsumers();
+            assertEquals("Unexpected number of consumers on queue " + queue.getName(),  1, consumers.size());
+        }
+    }
+
+    public void testSeparateSubscriptionNameSpaces() throws Exception
+    {
+        AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0();
+        Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+
+        Attach durableSharedWithContainerId = createSharedTopicAttach(true, linkName + "|1", address, false);
+        _session.receiveAttach(durableSharedWithContainerId);
+        assertAttachSent(_connection, _session, durableSharedWithContainerId, 0);
+
+        Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 1, queues.size());
+
+        Attach durableNonSharedWithContainerId = createTopicAttach(true, linkName, address, false);
+        _session.receiveAttach(durableNonSharedWithContainerId);
+        assertAttachFailed(_connection, _session, durableNonSharedWithContainerId, 1);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 1, queues.size());
+
+        Attach nonDurableSharedWithContainerId = createSharedTopicAttach(false, linkName + "|3", address, false);
+        _session.receiveAttach(nonDurableSharedWithContainerId);
+        assertAttachSent(_connection, _session, nonDurableSharedWithContainerId, 3);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 2, queues.size());
+
+        Attach durableSharedWithoutContainerId = createSharedTopicAttach(true, linkName + "|4", address, true);
+        secondSession.receiveAttach(durableSharedWithoutContainerId);
+        assertAttachSent(secondConnection, secondSession, durableSharedWithoutContainerId, 0);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 3, queues.size());
+
+        Attach nonDurableSharedWithoutContainerId = createSharedTopicAttach(false, linkName + "|5", address, true);
+        secondSession.receiveAttach(nonDurableSharedWithoutContainerId);
+        assertAttachSent(secondConnection, secondSession, nonDurableSharedWithoutContainerId, 1);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 4, queues.size());
+
+        Attach nonDurableNonSharedWithoutContainerId = createTopicAttach(false, linkName + "|6", address, true);
+        secondSession.receiveAttach(nonDurableNonSharedWithoutContainerId);
+        assertAttachSent(secondConnection, secondSession, nonDurableNonSharedWithoutContainerId, 2);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 5, queues.size());
+
+        Attach nonDurableNonSharedWithContainerId = createTopicAttach(false, linkName + "|6", address, false);
+        _session.receiveAttach(nonDurableNonSharedWithContainerId);
+        assertAttachSent(_connection, _session, nonDurableNonSharedWithContainerId, 4);
+
+        queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after durable non shared with containerId", 6, queues.size());
+
+    }
+
+    public void testReceiveAttachForInvalidUnsubscribe() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+
+        Attach unsubscribeAttach = createTopicAttach(true, linkName, address, false);
+        unsubscribeAttach.setSource(null);
+
+        _session.receiveAttach(unsubscribeAttach);
+        assertAttachFailed(_connection, _session, unsubscribeAttach);
+
+        Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after unsubscribe", 0, queues.size());
+    }
+
+    public void testNullSourceLookup() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(true, linkName, address, false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        sendDetach(_session, attach.getHandle(), false);
+
+        Attach nullSourceAttach = createTopicAttach(true, linkName, address, false);
+        nullSourceAttach.setSource(null);
+
+        _session.receiveAttach(nullSourceAttach);
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(_connection, times(3)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+        Attach sentAttach = (Attach) frameCapture.getAllValues().get(2);
+
+        assertEquals("Unexpected name", nullSourceAttach.getName(), sentAttach.getName());
+        assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+        assertNotNull("Unexpected source", sentAttach.getSource());
+        Source source = (Source)sentAttach.getSource();
+        assertEquals("Unexpected address", address, source.getAddress());
+        assertEquals("Unexpected capabilities", ((Source)attach.getSource()).getCapabilities(), source.getCapabilities());
+
+        Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after unsubscribe", 1, queues.size());
+    }
+
+    public void testReceiveDetachClosed() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(true, linkName, address, false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        sendDetach(_session, attach.getHandle(), true);
+
+        Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after unsubscribe", 0, queues.size());
+    }
+
+    public void testReceiveAttachToExistingQueue() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = QUEUE_NAME;
+        Attach attach = createQueueAttach(false, linkName, address);
+
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, QUEUE_NAME));
+        Exchange<?> exchange = _virtualHost.getChildByName(Exchange.class, "amq.direct");
+        exchange.bind(QUEUE_NAME, QUEUE_NAME, Collections.<String, Object>emptyMap(), false);
+
+        _session.receiveAttach(attach);
+
+        assertAttachActions(queue, attach);
+    }
+
+    public void testReceiveAttachToNonExistingQueue() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = QUEUE_NAME;
+        Attach attach = createQueueAttach(false, linkName, address);
+        _session.receiveAttach(attach);
+        assertAttachFailed(_connection, _session, attach);
+    }
+
+    public void testReceiveAttachRebindingQueueNoActiveLinks()
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createSharedTopicAttach(true, linkName, address, true);
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        sendDetach(_session, attach.getHandle(), false);
+
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(_connection, times(2)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+        assertTrue(frameCapture.getAllValues().get(1) instanceof Detach);
+
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        String topicName2 = TOPIC_NAME + "2";
+        final String address2 = "amq.direct/" + topicName2;
+        Attach attach2 = createSharedTopicAttach(true, linkName + "|2", address2, true);
+
+        _session.receiveAttach(attach2);
+        assertAttachSent(_connection, _session, attach2, 2);
+
+        assertQueues(topicName2, LifetimePolicy.PERMANENT);
+    }
+
+    public void testReceiveReattachRebindingQueueNoActiveLinks()
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createSharedTopicAttach(true, linkName, address, true);
+        _session.receiveAttach(attach);
+
+        assertAttachSent(_connection, _session, attach);
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        sendDetach(_session, attach.getHandle(), false);
+
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(_connection, times(2)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+        assertTrue(frameCapture.getAllValues().get(1) instanceof Detach);
+
+        assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+        String topicName2 = TOPIC_NAME + "2";
+        final String address2 = "amq.direct/" + topicName2;
+        Attach attach2 = createSharedTopicAttach(true, linkName, address2, true);
+
+        _session.receiveAttach(attach2);
+        assertAttachSent(_connection, _session, attach2, 2);
+
+        assertQueues(topicName2, LifetimePolicy.PERMANENT);
+    }
+
+
+    private void assertAttachActions(final Queue<?> queue, final Attach receivedAttach)
+    {
+        Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
+        assertEquals("Unexpected consumers size", 1, consumers.size());
+
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(_connection).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+        Attach sentAttach = (Attach) frameCapture.getValue();
+
+        assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
+        assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+        assertEquals("Unexpected source", receivedAttach.getSource(), sentAttach.getSource());
+        assertEquals("Unexpected target", receivedAttach.getTarget(), sentAttach.getTarget());
+
+        final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after attach", 1, queues.size());
+    }
+
+    private void assertAttachSent(final AMQPConnection_1_0 connection,
+                                  final Session_1_0 session,
+                                  final Attach receivedAttach)
+    {
+        assertAttachSent(connection, session, receivedAttach, 0);
+    }
+
+    private void assertAttachSent(final AMQPConnection_1_0 connection,
+                                  final Session_1_0 session,
+                                  final Attach receivedAttach,
+                                  final int invocationOffset)
+    {
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
+        Attach sentAttach = (Attach) frameCapture.getAllValues().get(invocationOffset);
+
+        assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
+        assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+    }
+
+    private void assertQueues(final String publishingLinkName, final LifetimePolicy expectedLifetimePolicy)
+    {
+        final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after attach", 1, queues.size());
+        Queue queue = queues.iterator().next();
+        assertEquals("Unexpected queue durability",
+                     expectedLifetimePolicy, queue.getLifetimePolicy());
+        // boolean isDurable = ((Source) attach.getSource()).getDurable() != TerminusDurability.NONE;
+        Collection<PublishingLink> queuePublishingLinks = queue.getPublishingLinks();
+        assertEquals("Unexpected number of publishing links", 1, queuePublishingLinks.size());
+        assertEquals("Unexpected link name", publishingLinkName, queuePublishingLinks.iterator().next().getName());
+
+        Exchange<?> exchange = _virtualHost.getChildByName(Exchange.class, "amq.direct");
+        assertTrue("Binding should exist", exchange.hasBinding(publishingLinkName, queue));
+    }
+
+    private void assertAttachFailed(final AMQPConnection_1_0 connection, final Session_1_0 session, final Attach attach, int invocationOffset)
+    {
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(connection, times(invocationOffset + 2)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
+        List<FrameBody> sentFrames = frameCapture.getAllValues();
+
+        assertTrue("unexpected Frame sent", sentFrames.get(invocationOffset) instanceof Attach);
+        Attach sentAttach = (Attach) sentFrames.get(invocationOffset);
+        assertEquals("Unexpected name", attach.getName(), sentAttach.getName());
+        assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+        assertEquals("Unexpected source", null, sentAttach.getSource());
+
+        assertTrue("unexpected Frame sent", sentFrames.get(invocationOffset + 1) instanceof Detach);
+        Detach sentDetach = (Detach) sentFrames.get(invocationOffset + 1);
+        assertTrue("Unexpected closed state", sentDetach.getClosed());
+    }
+
+    private void assertAttachFailed(final AMQPConnection_1_0 connection, final Session_1_0 session, final Attach attach)
+    {
+        assertAttachFailed(connection, session, attach, 0);
+    }
+
+    private Attach createSharedTopicAttach(final boolean durable,
+                                           final String linkName,
+                                           final String address,
+                                           final boolean isGlobal)
+    {
+        return createAttach(durable, linkName, address, TOPIC_CAPABILITY, isGlobal, true);
+    }
+
+    private Attach createTopicAttach(final boolean durable,
+                                     final String linkName,
+                                     final String address,
+                                     final boolean isGlobal)
+    {
+        return createAttach(durable, linkName, address, TOPIC_CAPABILITY, isGlobal, false);
+    }
+
+    private Attach createQueueAttach(final boolean durable,
+                                     final String linkName,
+                                     final String address)
+    {
+        return createAttach(durable, linkName, address, QUEUE_CAPABILITY, false, false);
+    }
+
+    private Attach createAttach(final boolean durable,
+                                final String linkName,
+                                final String address,
+                                final Symbol destinationTypeCapability,
+                                final boolean isGlobal,
+                                final boolean isShared)
+    {
+        Attach attach = new Attach();
+        Source source = new Source();
+
+        List<Symbol> capabilities = new ArrayList<>();
+        if (isGlobal)
+        {
+            capabilities.add(Symbol.getSymbol("global"));
+        }
+        if (isShared)
+        {
+            capabilities.add(Symbol.getSymbol("shared"));
+        }
+        capabilities.add(destinationTypeCapability);
+
+
+        source.setCapabilities(capabilities.toArray(new Symbol[capabilities.size()]));
+        if (durable)
+        {
+            source.setDurable(TerminusDurability.CONFIGURATION);
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+        }
+        else
+        {
+            source.setDurable(TerminusDurability.NONE);
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        }
+        attach.setSource(source);
+        Target target = new Target();
+        attach.setTarget(target);
+        attach.setHandle(new UnsignedInteger(_handle++));
+        attach.setIncompleteUnsettled(false);
+        attach.setName(linkName);
+        attach.setRole(Role.RECEIVER);
+        source.setAddress(address);
+        return attach;
+    }
+
+    private AMQPConnection_1_0 createAmqpConnection_1_0()
+    {
+        return createAmqpConnection_1_0(null);
+    }
+
+    private AMQPConnection_1_0 createAmqpConnection_1_0(String containerId)
+    {
+        AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+        Subject subject =
+                new Subject(true, Collections.<Principal>emptySet(), Collections.emptySet(), Collections.emptySet());
+        when(connection.getSubject()).thenReturn(subject);
+        when(connection.getAddressSpace()).thenReturn(_virtualHost);
+        when(connection.getEventLogger()).thenReturn(mock(EventLogger.class));
+        when(connection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L);
+        final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+        when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
+        {
+            @Override
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                runnableCaptor.getValue().run();
+                return Futures.immediateFuture(null);
+            }
+        });
+        AggregateTicker mockTicker = mock(AggregateTicker.class);
+        when(connection.getAggregateTicker()).thenReturn(mockTicker);
+        if (containerId != null)
+        {
+            when(connection.getRemoteContainerId()).thenReturn(containerId);
+        }
+        else
+        {
+            final String randomContainerId = UUID.randomUUID().toString();
+            when(connection.getRemoteContainerId()).thenReturn(randomContainerId);
+        }
+        return connection;
+    }
+
+    private Session_1_0 createSession_1_0(final AMQPConnection_1_0 connection, int channelId)
+    {
+        Begin begin = mock(Begin.class);
+        when(begin.getNextOutgoingId()).thenReturn(new UnsignedInteger(channelId));
+        Session_1_0 _session = new Session_1_0(connection, begin);
+        _session.setReceivingChannel((short)channelId);
+        _session.setSendingChannel((short)channelId);
+        return _session;
+    }
+
+    private void sendDetach(final Session_1_0 session,
+                            final UnsignedInteger handle,
+                            final boolean closed)
+    {
+        final Detach detach = new Detach();
+        detach.setHandle(handle);
+        detach.setClosed(closed);
+        session.receiveDetach(detach);
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org