You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/01/24 10:14:48 UTC
svn commit: r1780052 [2/3] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/main/java/org/...
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1780052&r1=1780051&r2=1780052&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Jan 24 10:14:48 2017
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v1_0;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.server.protocol.v1_0.ExchangeDestination.SHARED_CAPABILITY;
import java.security.AccessControlContext;
import java.security.AccessControlException;
@@ -28,6 +29,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -49,6 +51,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
@@ -62,13 +66,17 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
@@ -87,9 +95,16 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -111,6 +126,7 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.transport.network.Ticker;
public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
@@ -291,8 +307,7 @@ public class Session_1_0 implements AMQS
endpoint.setLocalHandle(localHandle);
_localLinkEndpoints.put(endpoint, localHandle);
- remoteLinkCreation(endpoint);
-
+ remoteLinkCreation(endpoint, attach);
}
else
{
@@ -771,332 +786,647 @@ public class Session_1_0 implements AMQS
return _accessControllerContext;
}
- public void remoteLinkCreation(final LinkEndpoint endpoint)
+ public void remoteLinkCreation(final LinkEndpoint endpoint, Attach attach)
{
- Destination destination;
Link_1_0 link = null;
Error error = null;
+ Set<Symbol> capabilities = new HashSet<>();
+ try
+ {
+ if (endpoint.getRole() == Role.SENDER)
+ {
+ link = createSendingLink(endpoint, attach);
+ if (link != null)
+ {
+ capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
+ }
+ }
+ else if (endpoint.getTarget() instanceof Coordinator)
+ {
+ link = createCoordinatorLink(endpoint);
+ }
+ else // standard (non-Coordinator) receiver
+ {
+ link = createReceivingLink(endpoint, capabilities);
+ }
+ }
+ catch (AmqpErrorException e)
+ {
+
+ if (e.getError() == null || e.getError().getCondition() == AmqpError.INTERNAL_ERROR)
+ {
+ _logger.error("Could not create link", e);
+ }
+ else
+ {
+ _logger.debug("Could not create link", e);
+ }
+ if (endpoint.getRole() == Role.SENDER)
+ {
+ endpoint.setSource(null);
+ }
+ else
+ {
+ endpoint.setTarget(null);
+ }
+ error = e.getError();
+ }
+
+ endpoint.setCapabilities(capabilities);
+ endpoint.attach();
+ if (link == null)
+ {
+ if (error == null)
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_FOUND);
+ }
+ endpoint.close(error);
+ }
+ else
+ {
+ link.start();
+ }
+ }
+
+ private Link_1_0 createReceivingLink(final LinkEndpoint endpoint,
+ final Set<Symbol> capabilities)
+ {
+ Link_1_0 link = null;
+ Destination destination;
final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
- Set<Symbol> capabilities = new HashSet<>();
+ StandardReceivingLink_1_0 previousLink =
+ (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
- if (endpoint.getRole() == Role.SENDER)
+ if (previousLink == null)
{
- final SendingLink_1_0 previousLink =
- (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+ Target target = (Target) endpoint.getTarget();
- if (previousLink == null)
+ if (target != null)
{
+ if (Boolean.TRUE.equals(target.getDynamic()))
+ {
- Target target = (Target) endpoint.getTarget();
- Source source = (Source) endpoint.getSource();
-
+ MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
+ target.setAddress(tempQueue.getName());
+ }
- if (source != null)
+ String addr = target.getAddress();
+ if (addr == null || "".equals(addr.trim()))
{
- if (Boolean.TRUE.equals(source.getDynamic()))
+ MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy(), "",
+ target.getCapabilities(),
+ _connection.getEventLogger());
+ target.setCapabilities(destination.getCapabilities());
+
+ if (_blockingEntities.contains(messageDestination))
{
- MessageSource tempQueue = createDynamicSource(source.getDynamicNodeProperties());
- source.setAddress(tempQueue.getName());
+ endpoint.setStopped(true);
+ }
+ }
+ else if (!addr.startsWith("/") && addr.contains("/"))
+ {
+ String[] parts = addr.split("/", 2);
+ Exchange<?> exchange = getExchange(parts[0]);
+ if (exchange != null)
+ {
+ Symbol[] capabilities1 = target.getCapabilities();
+ ExchangeDestination exchangeDestination = new ExchangeDestination(exchange,
+ null,
+ target.getDurable(),
+ target.getExpiryPolicy(),
+ parts[0],
+ parts[1],
+ capabilities1 != null ? Arrays.asList(capabilities1) : Collections.<Symbol>emptyList());
+ target.setCapabilities(exchangeDestination.getCapabilities());
+ destination = exchangeDestination;
}
- String addr = source.getAddress();
- if (!addr.startsWith("/") && addr.contains("/"))
+ else
{
- String[] parts = addr.split("/", 2);
- Exchange<?> exchg = getExchange(parts[0]);
- if (exchg != null)
- {
- ExchangeDestination exchangeDestination =
- new ExchangeDestination(exchg,
- source.getDurable(),
- source.getExpiryPolicy(),
- parts[0],
- target.getCapabilities());
- exchangeDestination.setInitialRoutingAddress(parts[1]);
- destination = exchangeDestination;
- target.setCapabilities(exchangeDestination.getCapabilities());
- }
- else
- {
- endpoint.setSource(null);
- destination = null;
- }
+ endpoint.setTarget(null);
+ destination = null;
+ }
+ }
+ else
+ {
+ MessageDestination messageDestination =
+ getAddressSpace().getAttainedMessageDestination(addr);
+ if (messageDestination != null)
+ {
+ destination =
+ new NodeReceivingDestination(messageDestination,
+ target.getDurable(),
+ target.getExpiryPolicy(),
+ addr,
+ target.getCapabilities(),
+ _connection.getEventLogger());
+ target.setCapabilities(destination.getCapabilities());
}
else
{
- MessageSource queue = getAddressSpace().getAttainedMessageSource(addr);
+ Queue<?> queue = getQueue(addr);
if (queue != null)
{
- destination = new MessageSourceDestination(queue);
+
+ destination = new QueueDestination(queue, addr);
}
else
{
- Exchange<?> exchg = getExchange(addr);
- if (exchg != null)
- {
- ExchangeDestination exchangeDestination =
- new ExchangeDestination(exchg,
- source.getDurable(),
- source.getExpiryPolicy(),
- addr,
- target.getCapabilities());
- destination = exchangeDestination;
- target.setCapabilities(exchangeDestination.getCapabilities());
- }
- else
- {
- endpoint.setSource(null);
- destination = null;
- }
+ endpoint.setTarget(null);
+ destination = null;
}
}
+ }
+ }
+ else
+ {
+ destination = null;
+ }
+ if (destination != null)
+ {
+ final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+ MessageDestination messageDestination = receivingDestination.getMessageDestination();
+ if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
+ {
+ capabilities.add(DELAYED_DELIVERY);
+ }
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final StandardReceivingLink_1_0 receivingLink =
+ new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+ getAddressSpace(),
+ receivingDestination);
+
+ receivingLinkEndpoint.setLink(receivingLink);
+ link = receivingLink;
+ if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
+ {
+ linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
+ }
+ }
+ }
+ else
+ {
+ ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+ receivingLinkEndpoint.setLink(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ }
+ return link;
+ }
+ private TxnCoordinatorReceivingLink_1_0 createCoordinatorLink(final LinkEndpoint endpoint) throws AmqpErrorException
+ {
+ Coordinator coordinator = (Coordinator) endpoint.getTarget();
+ TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
+ boolean localTxn = false;
+ boolean multiplePerSession = false;
+ if (coordinatorCapabilities != null)
+ {
+ for (TxnCapability capability : coordinatorCapabilities)
+ {
+ if (capability.equals(TxnCapability.LOCAL_TXN))
+ {
+ localTxn = true;
+ }
+ else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+ {
+ multiplePerSession = true;
}
else
{
- destination = null;
+ Error error = new Error();
+ error.setCondition(AmqpError.NOT_IMPLEMENTED);
+ error.setDescription("Unsupported capability: " + capability);
+ throw new AmqpErrorException(error);
}
+ }
+ }
+
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
+ new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
+ this,
+ receivingLinkEndpoint,
+ _openTransactions);
+ receivingLinkEndpoint.setLink(coordinatorLink);
+ return coordinatorLink;
+ }
- if (destination != null)
+ private SendingLink_1_0 createSendingLink(final LinkEndpoint endpoint, Attach attach) throws AmqpErrorException
+ {
+ final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+ SendingLink_1_0 link = null;
+ final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
+ final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+
+ if (previousLink == null)
+ {
+ Source source = (Source) sendingLinkEndpoint.getSource();
+ SendingDestination destination = null;
+ if (source != null)
+ {
+ destination = getSendingDestination(sendingLinkEndpoint.getName(), source);
+ if (destination == null)
+ {
+ sendingLinkEndpoint.setSource(null);
+ }
+ else
+ {
+ source.setCapabilities(destination.getCapabilities());
+ }
+ }
+ else
+ {
+ final Symbol[] linkCapabilities = attach.getDesiredCapabilities();
+ boolean isGlobal = hasCapability(linkCapabilities, ExchangeDestination.GLOBAL_CAPABILITY);
+ final String queueName = getMangledSubscriptionName(endpoint.getName(), true, true, isGlobal);
+ final MessageSource messageSource = getAddressSpace().getAttainedMessageSource(queueName);
+ // TODO START The Source should be persisted on the LinkEndpoint
+ if (messageSource instanceof Queue)
{
- final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
- try
+ Queue<?> queue = (Queue<?>) messageSource;
+ source = new Source();
+ List<Symbol> capabilities = new ArrayList<>();
+ if (queue.getExclusive() == ExclusivityPolicy.SHARED_SUBSCRIPTION)
{
- final SendingLink_1_0 sendingLink =
- new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- getAddressSpace(),
- (SendingDestination) destination
- );
-
- sendingLinkEndpoint.setLink(sendingLink);
- registerConsumer(sendingLink);
+ capabilities.add(ExchangeDestination.SHARED_CAPABILITY);
+ }
+ if (isGlobal)
+ {
+ capabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
+ }
+ capabilities.add(ExchangeDestination.TOPIC_CAPABILITY);
+ source.setCapabilities(capabilities.toArray(new Symbol[capabilities.size()]));
- link = sendingLink;
- if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
- || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+ final Collection<Exchange> exchanges = queue.getVirtualHost().getChildren(Exchange.class);
+ String bindingKey = null;
+ Exchange<?> foundExchange = null;
+ for (Exchange<?> exchange : exchanges)
+ {
+ for (Binding binding : exchange.getPublishingLinks(queue))
+ {
+ String exchangeName = exchange.getName();
+ bindingKey = binding.getName();
+ final Map<String, Object> bindingArguments = binding.getArguments();
+ Map<Symbol, Filter> filter = new HashMap<>();
+ if (bindingArguments.containsKey(AMQPFilterTypes.JMS_SELECTOR))
+ {
+ filter.put(Symbol.getSymbol("jms-selector"), new JMSSelectorFilter((String) bindingArguments.get(AMQPFilterTypes.JMS_SELECTOR)));
+ }
+ if (bindingArguments.containsKey(AMQPFilterTypes.NO_LOCAL))
+ {
+ filter.put(Symbol.getSymbol("no-local"), NoLocalFilter.INSTANCE);
+ }
+ foundExchange = exchange;
+ source.setAddress(exchangeName + "/" + bindingKey);
+ source.setFilter(filter);
+ break;
+ }
+ if (foundExchange != null)
{
- linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+ break;
}
}
- catch (AmqpErrorException e)
+ if (foundExchange != null)
{
- _logger.error("Error creating sending link", e);
- destination = null;
- sendingLinkEndpoint.setSource(null);
- error = e.getError();
+ source.setDurable(TerminusDurability.CONFIGURATION);
+ TerminusExpiryPolicy terminusExpiryPolicy;
+ switch (queue.getLifetimePolicy())
+ {
+ case PERMANENT:
+ terminusExpiryPolicy = TerminusExpiryPolicy.NEVER;
+ break;
+ case DELETE_ON_NO_LINKS:
+ case DELETE_ON_NO_OUTBOUND_LINKS:
+ terminusExpiryPolicy = TerminusExpiryPolicy.LINK_DETACH;
+ break;
+ case DELETE_ON_CONNECTION_CLOSE:
+ terminusExpiryPolicy = TerminusExpiryPolicy.CONNECTION_CLOSE;
+ break;
+ case DELETE_ON_SESSION_END:
+ terminusExpiryPolicy = TerminusExpiryPolicy.SESSION_END;
+ break;
+ default:
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "unexpected liftetime policy " + queue.getLifetimePolicy()));
+ }
+ sendingLinkEndpoint.setSource(source);
+ destination = new ExchangeDestination(foundExchange,
+ queue,
+ TerminusDurability.CONFIGURATION,
+ terminusExpiryPolicy,
+ foundExchange.getName(),
+ bindingKey,
+ capabilities);
}
+
}
+ // TODO END
}
- else
+
+ if (destination != null)
{
- Source newSource = (Source) endpoint.getSource();
+ final SendingLink_1_0 sendingLink =
+ new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+ getAddressSpace(),
+ destination);
+ sendingLink.createConsumerTarget();
+
+ sendingLinkEndpoint.setLink(sendingLink);
+ registerConsumer(sendingLink);
- Source oldSource = (Source) previousLink.getEndpoint().getSource();
- final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
- if (newSourceDurable != null)
+ if (destination instanceof ExchangeDestination)
{
- oldSource.setDurable(newSourceDurable);
- if (newSourceDurable.equals(TerminusDurability.NONE))
- {
- linkRegistry.unregisterSendingLink(endpoint.getName());
- }
+ ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+ exchangeDestination.getQueue().setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE, State.ACTIVE));
+ }
+
+ link = sendingLink;
+ if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
+ {
+ linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
- endpoint.setSource(oldSource);
- SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
- sendingLinkEndpoint.setLink(previousLink);
- link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
- registerConsumer(previousLink);
}
}
- else if (endpoint.getTarget() instanceof Coordinator)
+ else
{
- Coordinator coordinator = (Coordinator) endpoint.getTarget();
- TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
- boolean localTxn = false;
- boolean multiplePerSession = false;
- if (coordinatorCapabilities != null)
+ Source newSource = (Source) attach.getSource();
+ Source oldSource = (Source) previousLink.getEndpoint().getSource();
+
+ if (previousLink.getDestination() instanceof ExchangeDestination && newSource != null && !Boolean.TRUE.equals(newSource.getDynamic()))
{
- for (TxnCapability capability : coordinatorCapabilities)
+ final SendingDestination newDestination = getSendingDestination(previousLink.getEndpoint().getName(), newSource);
+ if (updateSourceForSubscription(previousLink, newSource, newDestination))
{
- if (capability.equals(TxnCapability.LOCAL_TXN))
- {
- localTxn = true;
- }
- else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
- {
- multiplePerSession = true;
- }
- else
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_IMPLEMENTED);
- error.setDescription("Unsupported capability: " + capability);
- break;
- }
+ previousLink.setDestination(newDestination);
}
}
- /* if(!localTxn)
+ sendingLinkEndpoint.setSource(oldSource);
+ previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+ sendingLinkEndpoint.setLink(previousLink);
+ link = previousLink;
+ sendingLinkEndpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ registerConsumer(previousLink);
+
+ }
+ return link;
+ }
+
+ private boolean updateSourceForSubscription(final SendingLink_1_0 previousLink,
+ final Source newSource,
+ final SendingDestination newDestination)
+ {
+ SendingDestination oldDestination = previousLink.getDestination();
+ if (oldDestination instanceof ExchangeDestination)
+ {
+ ExchangeDestination oldExchangeDestination = (ExchangeDestination) oldDestination;
+ String newAddress = newSource.getAddress();
+ if (newDestination instanceof ExchangeDestination)
+ {
+ ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination;
+ if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
{
- coordinatorCapabilities.add(TxnCapabilities.LOCAL_TXN);
- }*/
+ Source oldSource = (Source) previousLink.getEndpoint().getSource();
+ oldSource.setAddress(newAddress);
+ oldSource.setFilter(newSource.getFilter());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private SendingDestination getSendingDestination(final String linkName, final Source source) throws AmqpErrorException
+ {
+ SendingDestination destination = null;
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
- new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
- this,
- receivingLinkEndpoint,
- _openTransactions);
- receivingLinkEndpoint.setLink(coordinatorLink);
- link = coordinatorLink;
+ if (Boolean.TRUE.equals(source.getDynamic()))
+ {
+ MessageSource tempQueue = createDynamicSource(source.getDynamicNodeProperties());
+ source.setAddress(tempQueue.getName()); // todo : temporary topic
}
- else // standard (non-Coordinator) receiver
+
+ String address = source.getAddress();
+ if (address != null)
{
+ if (!address.startsWith("/") && address.contains("/"))
+ {
+ destination = createExchangeDestination(address, linkName, source);
+ }
+ else
+ {
+ MessageSource queue = getAddressSpace().getAttainedMessageSource(address);
+ if (queue != null)
+ {
+ destination = new MessageSourceDestination(queue);
+ }
+ else
+ {
+ destination = createExchangeDestination(address, null, linkName, source);
+ }
+ }
+ }
+
+ return destination;
+ }
- StandardReceivingLink_1_0 previousLink =
- (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
+ private ExchangeDestination createExchangeDestination(String address, final String linkName, final Source source)
+ throws AmqpErrorException
+ {
+ String[] parts = address.split("/", 2);
+ String exchangeName = parts[0];
+ String bindingKey = parts[1];
+ return createExchangeDestination(exchangeName, bindingKey, linkName, source);
+ }
- if (previousLink == null)
+ private ExchangeDestination createExchangeDestination(final String exchangeName,
+ final String bindingKey,
+ final String linkName,
+ final Source source) throws AmqpErrorException
+ {
+ ExchangeDestination exchangeDestination = null;
+ Exchange<?> exchange = getExchange(exchangeName);
+ List<Symbol> sourceCapabilities = new ArrayList<>();
+ if (exchange != null)
+ {
+ Queue queue = null;
+ if (!Boolean.TRUE.equals(source.getDynamic()))
{
+ final Map<String, Object> attributes = new HashMap<>();
+ boolean isDurable = source.getDurable() != TerminusDurability.NONE;
+ boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
+ boolean isGlobal = hasCapability(source.getCapabilities(), ExchangeDestination.GLOBAL_CAPABILITY);
- Target target = (Target) endpoint.getTarget();
+ final String name = getMangledSubscriptionName(linkName, isDurable, isShared, isGlobal);
- if (target != null)
+ if (isGlobal)
{
- if (Boolean.TRUE.equals(target.getDynamic()))
- {
+ sourceCapabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
+ }
- MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
- target.setAddress(tempQueue.getName());
- }
+ ExclusivityPolicy exclusivityPolicy;
+ if (isShared)
+ {
+ exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
+ sourceCapabilities.add(SHARED_CAPABILITY);
+ }
+ else
+ {
+ exclusivityPolicy = ExclusivityPolicy.LINK;
+ }
- String addr = target.getAddress();
- if (addr == null || "".equals(addr.trim()))
- {
- MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
- destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy(), "",
- target.getCapabilities(),
- _connection.getEventLogger());
- target.setCapabilities(destination.getCapabilities());
+ org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = getLifetimePolicy(source.getExpiryPolicy());
- if (_blockingEntities.contains(messageDestination))
- {
- endpoint.setStopped(true);
- }
- }
- else if (!addr.startsWith("/") && addr.contains("/"))
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.NAME, name);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.DURABLE, isDurable);
+
+ BindingInfo bindingInfo = new BindingInfo(exchange, name,
+ bindingKey, source.getFilter());
+ Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
+ try
+ {
+ if (getAddressSpace() instanceof QueueManagingVirtualHost)
{
- String[] parts = addr.split("/", 2);
- Exchange<?> exchange = getExchange(parts[0]);
- if (exchange != null)
+ try
{
- ExchangeDestination exchangeDestination =
- new ExchangeDestination(exchange,
- target.getDurable(),
- target.getExpiryPolicy(),
- parts[0],
- target.getCapabilities());
-
- exchangeDestination.setInitialRoutingAddress(parts[1]);
- target.setCapabilities(exchangeDestination.getCapabilities());
- destination = exchangeDestination;
+ queue = ((QueueManagingVirtualHost) getAddressSpace()).getSubscriptionQueue(exchangeName, attributes, bindings);
}
- else
+ catch (NotFoundException e)
{
- endpoint.setTarget(null);
- destination = null;
+ throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
}
}
else
{
- MessageDestination messageDestination =
- getAddressSpace().getAttainedMessageDestination(addr);
- if (messageDestination != null)
- {
- destination =
- new NodeReceivingDestination(messageDestination,
- target.getDurable(),
- target.getExpiryPolicy(),
- addr,
- target.getCapabilities(),
- _connection.getEventLogger());
- target.setCapabilities(destination.getCapabilities());
- }
- else
- {
- Queue<?> queue = getQueue(addr);
- if (queue != null)
- {
-
- destination = new QueueDestination(queue, addr);
- }
- else
- {
- endpoint.setTarget(null);
- destination = null;
- }
- }
+ throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
+ "Address space of unexpected type"));
}
}
- else
- {
- destination = null;
- }
- if (destination != null)
+ catch(IllegalStateException e)
{
- final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
- MessageDestination messageDestination = receivingDestination.getMessageDestination();
- if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
- {
- capabilities.add(DELAYED_DELIVERY);
- }
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final StandardReceivingLink_1_0 receivingLink =
- new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
- getAddressSpace(),
- receivingDestination);
-
- receivingLinkEndpoint.setLink(receivingLink);
- link = receivingLink;
- if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
- || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
- {
- linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
- }
+ throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED,
+ "Subscription is already in use"));
}
+ source.setFilter(bindingInfo.getActualFilters().isEmpty() ? null : bindingInfo.getActualFilters());
+ source.setDistributionMode(StdDistMode.COPY);
+ exchangeDestination = new ExchangeDestination(exchange,
+ queue,
+ source.getDurable(),
+ source.getExpiryPolicy(),
+ exchangeName,
+ bindingKey,
+ sourceCapabilities);
}
else
{
- ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
- receivingLinkEndpoint.setLink(previousLink);
- link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ // TODO
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Temporary subscription is not implemented"));
}
}
+ return exchangeDestination;
+ }
+ private org.apache.qpid.server.model.LifetimePolicy getLifetimePolicy(final TerminusExpiryPolicy expiryPolicy) throws AmqpErrorException
+ {
+ org.apache.qpid.server.model.LifetimePolicy lifetimePolicy;
+ if (expiryPolicy == null || expiryPolicy == TerminusExpiryPolicy.SESSION_END)
+ {
+ lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_SESSION_END;
+ }
+ else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
+ {
+ lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
+ }
+ else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
+ {
+ lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ }
+ else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
+ {
+ lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
+ }
+ else
+ {
+ Error error = new Error(AmqpError.NOT_IMPLEMENTED,
+ String.format("unknown ExpiryPolicy '%s'", expiryPolicy.getValue()));
+ throw new AmqpErrorException(error);
+ }
+ return lifetimePolicy;
+ }
- endpoint.setCapabilities(capabilities);
- endpoint.attach();
+ private String getMangledSubscriptionName(final String linkName,
+ final boolean isDurable,
+ final boolean isShared,
+ final boolean isGlobal)
+ {
+ String remoteContainerId = getConnection().getRemoteContainerId();
+ if (isGlobal)
+ {
+ remoteContainerId = "_global_";
+ }
+ else
+ {
+ remoteContainerId = sanitizeName(remoteContainerId);
+ }
- if (link == null)
+ String subscriptionName;
+ if (!isDurable && !isShared)
{
- if (error == null)
+ subscriptionName = UUID.randomUUID().toString();
+ }
+ else
+ {
+ subscriptionName = linkName;
+ if (isShared)
{
- error = new Error();
- error.setCondition(AmqpError.NOT_FOUND);
+ int separator = subscriptionName.indexOf("|");
+ if (separator > 0)
+ {
+ subscriptionName = subscriptionName.substring(0, separator);
+ }
}
- endpoint.close(error);
+ subscriptionName = sanitizeName(subscriptionName);
}
- else
+ return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + "_/" + (isDurable
+ ? "durable"
+ : "nondurable");
+ }
+
+ private String sanitizeName(String name)
+ {
+ return name.replace("_", "__")
+ .replace(".", "_:")
+ .replace("(", "_O")
+ .replace(")", "_C")
+ .replace("<", "_L")
+ .replace(">", "_R");
+ }
+
+ private boolean hasCapability(final Symbol[] capabilities,
+ final Symbol expectedCapability)
+ {
+ if (capabilities != null)
{
- link.start();
+ for (Symbol capability : capabilities)
+ {
+ if (expectedCapability.equals(capability))
+ {
+ return true;
+ }
+ }
}
+ return false;
}
@@ -1822,7 +2152,14 @@ public class Session_1_0 implements AMQS
{
if (link.getEndpoint() == linkEndpoint)
{
- link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+ try
+ {
+ link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
}
}
@@ -1907,4 +2244,118 @@ public class Session_1_0 implements AMQS
}
}
}
+
+ private final class BindingInfo
+ {
+ private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
+ private final Map<String, Map<String, Object>> _bindings = new HashMap<>();
+
+ private BindingInfo(Exchange<?> exchange,
+ final String queueName,
+ String bindingKey,
+ Map<Symbol, Filter> filters)
+ {
+ String binding = null;
+ final Map<String, Object> arguments = new HashMap<>();
+ if (filters != null && !filters.isEmpty())
+ {
+ boolean hasBindingFilter = false;
+ boolean hasMessageFilter = false;
+ for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+ {
+ if(!hasBindingFilter
+ && entry.getValue() instanceof ExactSubjectFilter
+ && exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ ExactSubjectFilter filter = (ExactSubjectFilter) entry.getValue();
+ binding = filter.getValue();
+ _actualFilters.put(entry.getKey(), filter);
+ hasBindingFilter = true;
+ }
+ else if(!hasBindingFilter
+ && entry.getValue() instanceof MatchingSubjectFilter
+ && exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ MatchingSubjectFilter filter = (MatchingSubjectFilter) entry.getValue();
+ binding = filter.getValue();
+ _actualFilters.put(entry.getKey(), filter);
+ hasBindingFilter = true;
+ }
+ else if(entry.getValue() instanceof NoLocalFilter)
+ {
+ _actualFilters.put(entry.getKey(), entry.getValue());
+ arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
+ }
+ else if(!hasMessageFilter && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+ {
+ org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
+ _actualFilters.put(entry.getKey(), selectorFilter);
+ hasMessageFilter = true;
+ }
+ }
+ }
+
+ if(binding != null)
+ {
+ _bindings.put(binding, arguments);
+ }
+ if(bindingKey != null)
+ {
+ _bindings.put(bindingKey, arguments);
+ }
+ if(binding == null
+ && bindingKey == null
+ && exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+ {
+ _bindings.put(queueName, arguments);
+ }
+ else if(binding == null
+ && bindingKey == null
+ && exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ _bindings.put("#", arguments);
+ }
+ }
+
+ private Map<Symbol, Filter> getActualFilters()
+ {
+ return _actualFilters;
+ }
+
+ private Map<String, Map<String, Object>> getBindings()
+ {
+ return _bindings;
+ }
+
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final BindingInfo that = (BindingInfo) o;
+
+ if (!_actualFilters.equals(that._actualFilters))
+ {
+ return false;
+ }
+ return _bindings.equals(that._bindings);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _actualFilters.hashCode();
+ result = 31 * result + _bindings.hashCode();
+ return result;
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java?rev=1780052&r1=1780051&r2=1780052&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/AmqpErrorException.java Tue Jan 24 10:14:48 2017
@@ -34,6 +34,12 @@ public class AmqpErrorException extends
_error = error;
}
+ public AmqpErrorException(final Error error, final Throwable cause)
+ {
+ super(cause);
+ _error = error;
+ }
+
public AmqpErrorException(ErrorCondition condition)
{
_error = new Error();
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java?rev=1780052&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java Tue Jan 24 10:14:48 2017
@@ -0,0 +1,605 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import javax.security.auth.Subject;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class Session_1_0Test extends QpidTestCase
+{
+ private static final String TOPIC_NAME = "testTopic";
+ private static final String QUEUE_NAME = "testQueue";
+ private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
+ private static final Symbol QUEUE_CAPABILITY = Symbol.getSymbol("queue");
+ private AMQPConnection_1_0 _connection;
+ private VirtualHost<?> _virtualHost;
+ private Session_1_0 _session;
+ private int _handle;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _virtualHost = BrokerTestHelper.createVirtualHost("testVH");
+ _connection = createAmqpConnection_1_0("testContainerId");
+ this._session = createSession_1_0(_connection, 0);
+ }
+
+ public void testReceiveAttachTopicNonDurableNoContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(false, linkName, address, true);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ }
+
+ public void testReceiveAttachTopicNonDurableWithContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(false, linkName, address, false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ }
+
+ public void testReceiveAttachTopicDurableNoContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(true, linkName, address, true);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+ }
+
+ public void testReceiveAttachTopicDurableWithContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(true, linkName+ "|1", address, false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0("testContainerId2");
+ Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+ Attach attach2 = createTopicAttach(true, linkName + "|2", address, false);
+ secondSession.receiveAttach(attach2);
+
+ assertAttachSent(secondConnection, secondSession, attach2);
+ Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after second subscription with the same subscription name but different container id ", 2, queues.size());
+ }
+
+ public void testReceiveAttachSharedTopicNonDurableNoContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createSharedTopicAttach(false, linkName, address, true);
+ Attach attach2 = createSharedTopicAttach(false, linkName, address, true);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+ AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0();
+ Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+ secondSession.receiveAttach(attach2);
+
+ assertAttachSent(secondConnection, secondSession, attach2);
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+ final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after attach", 1, queues.size());
+ Queue queue = queues.iterator().next();
+
+ Collection<Consumer<?,?>> consumers = queue.getConsumers();
+ assertEquals("Unexpected number of consumers", 2, consumers.size());
+ }
+
+ public void testReceiveAttachSharedTopicNonDurableWithContainer() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createSharedTopicAttach(false, linkName, address, false);
+ Attach attach2 = createSharedTopicAttach(false, linkName, address, false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+ AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0("testContainerId2");
+ Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+ secondSession.receiveAttach(attach2);
+
+ assertAttachSent(secondConnection, secondSession, attach2);
+
+ final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after attach", 2, queues.size());
+
+ for (Queue queue : queues)
+ {
+ Collection<Consumer<?,?>> consumers = queue.getConsumers();
+ assertEquals("Unexpected number of consumers on queue " + queue.getName(), 1, consumers.size());
+ }
+ }
+
+ public void testSeparateSubscriptionNameSpaces() throws Exception
+ {
+ AMQPConnection_1_0 secondConnection = createAmqpConnection_1_0();
+ Session_1_0 secondSession = createSession_1_0(secondConnection, 0);
+
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+
+ Attach durableSharedWithContainerId = createSharedTopicAttach(true, linkName + "|1", address, false);
+ _session.receiveAttach(durableSharedWithContainerId);
+ assertAttachSent(_connection, _session, durableSharedWithContainerId, 0);
+
+ Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 1, queues.size());
+
+ Attach durableNonSharedWithContainerId = createTopicAttach(true, linkName, address, false);
+ _session.receiveAttach(durableNonSharedWithContainerId);
+ assertAttachFailed(_connection, _session, durableNonSharedWithContainerId, 1);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 1, queues.size());
+
+ Attach nonDurableSharedWithContainerId = createSharedTopicAttach(false, linkName + "|3", address, false);
+ _session.receiveAttach(nonDurableSharedWithContainerId);
+ assertAttachSent(_connection, _session, nonDurableSharedWithContainerId, 3);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 2, queues.size());
+
+ Attach durableSharedWithoutContainerId = createSharedTopicAttach(true, linkName + "|4", address, true);
+ secondSession.receiveAttach(durableSharedWithoutContainerId);
+ assertAttachSent(secondConnection, secondSession, durableSharedWithoutContainerId, 0);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 3, queues.size());
+
+ Attach nonDurableSharedWithoutContainerId = createSharedTopicAttach(false, linkName + "|5", address, true);
+ secondSession.receiveAttach(nonDurableSharedWithoutContainerId);
+ assertAttachSent(secondConnection, secondSession, nonDurableSharedWithoutContainerId, 1);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 4, queues.size());
+
+ Attach nonDurableNonSharedWithoutContainerId = createTopicAttach(false, linkName + "|6", address, true);
+ secondSession.receiveAttach(nonDurableNonSharedWithoutContainerId);
+ assertAttachSent(secondConnection, secondSession, nonDurableNonSharedWithoutContainerId, 2);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 5, queues.size());
+
+ Attach nonDurableNonSharedWithContainerId = createTopicAttach(false, linkName + "|6", address, false);
+ _session.receiveAttach(nonDurableNonSharedWithContainerId);
+ assertAttachSent(_connection, _session, nonDurableNonSharedWithContainerId, 4);
+
+ queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after durable non shared with containerId", 6, queues.size());
+
+ }
+
+ public void testReceiveAttachForInvalidUnsubscribe() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+
+ Attach unsubscribeAttach = createTopicAttach(true, linkName, address, false);
+ unsubscribeAttach.setSource(null);
+
+ _session.receiveAttach(unsubscribeAttach);
+ assertAttachFailed(_connection, _session, unsubscribeAttach);
+
+ Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after unsubscribe", 0, queues.size());
+ }
+
+ public void testNullSourceLookup() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(true, linkName, address, false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ sendDetach(_session, attach.getHandle(), false);
+
+ Attach nullSourceAttach = createTopicAttach(true, linkName, address, false);
+ nullSourceAttach.setSource(null);
+
+ _session.receiveAttach(nullSourceAttach);
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(_connection, times(3)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+ Attach sentAttach = (Attach) frameCapture.getAllValues().get(2);
+
+ assertEquals("Unexpected name", nullSourceAttach.getName(), sentAttach.getName());
+ assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+ assertNotNull("Unexpected source", sentAttach.getSource());
+ Source source = (Source)sentAttach.getSource();
+ assertEquals("Unexpected address", address, source.getAddress());
+ assertEquals("Unexpected capabilities", ((Source)attach.getSource()).getCapabilities(), source.getCapabilities());
+
+ Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after unsubscribe", 1, queues.size());
+ }
+
+ public void testReceiveDetachClosed() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(true, linkName, address, false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ sendDetach(_session, attach.getHandle(), true);
+
+ Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after unsubscribe", 0, queues.size());
+ }
+
+ public void testReceiveAttachToExistingQueue() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = QUEUE_NAME;
+ Attach attach = createQueueAttach(false, linkName, address);
+
+ Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.<String, Object>singletonMap(Queue.NAME, QUEUE_NAME));
+ Exchange<?> exchange = _virtualHost.getChildByName(Exchange.class, "amq.direct");
+ exchange.bind(QUEUE_NAME, QUEUE_NAME, Collections.<String, Object>emptyMap(), false);
+
+ _session.receiveAttach(attach);
+
+ assertAttachActions(queue, attach);
+ }
+
+ public void testReceiveAttachToNonExistingQueue() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = QUEUE_NAME;
+ Attach attach = createQueueAttach(false, linkName, address);
+ _session.receiveAttach(attach);
+ assertAttachFailed(_connection, _session, attach);
+ }
+
+ public void testReceiveAttachRebindingQueueNoActiveLinks()
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createSharedTopicAttach(true, linkName, address, true);
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ sendDetach(_session, attach.getHandle(), false);
+
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(_connection, times(2)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+ assertTrue(frameCapture.getAllValues().get(1) instanceof Detach);
+
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ String topicName2 = TOPIC_NAME + "2";
+ final String address2 = "amq.direct/" + topicName2;
+ Attach attach2 = createSharedTopicAttach(true, linkName + "|2", address2, true);
+
+ _session.receiveAttach(attach2);
+ assertAttachSent(_connection, _session, attach2, 2);
+
+ assertQueues(topicName2, LifetimePolicy.PERMANENT);
+ }
+
+ public void testReceiveReattachRebindingQueueNoActiveLinks()
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createSharedTopicAttach(true, linkName, address, true);
+ _session.receiveAttach(attach);
+
+ assertAttachSent(_connection, _session, attach);
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ sendDetach(_session, attach.getHandle(), false);
+
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(_connection, times(2)).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+ assertTrue(frameCapture.getAllValues().get(1) instanceof Detach);
+
+ assertQueues(TOPIC_NAME, LifetimePolicy.PERMANENT);
+
+ String topicName2 = TOPIC_NAME + "2";
+ final String address2 = "amq.direct/" + topicName2;
+ Attach attach2 = createSharedTopicAttach(true, linkName, address2, true);
+
+ _session.receiveAttach(attach2);
+ assertAttachSent(_connection, _session, attach2, 2);
+
+ assertQueues(topicName2, LifetimePolicy.PERMANENT);
+ }
+
+
+ private void assertAttachActions(final Queue<?> queue, final Attach receivedAttach)
+ {
+ Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
+ assertEquals("Unexpected consumers size", 1, consumers.size());
+
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(_connection).sendFrame(eq((short) _session.getChannelId()), frameCapture.capture());
+ Attach sentAttach = (Attach) frameCapture.getValue();
+
+ assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
+ assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+ assertEquals("Unexpected source", receivedAttach.getSource(), sentAttach.getSource());
+ assertEquals("Unexpected target", receivedAttach.getTarget(), sentAttach.getTarget());
+
+ final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after attach", 1, queues.size());
+ }
+
+ private void assertAttachSent(final AMQPConnection_1_0 connection,
+ final Session_1_0 session,
+ final Attach receivedAttach)
+ {
+ assertAttachSent(connection, session, receivedAttach, 0);
+ }
+
+ private void assertAttachSent(final AMQPConnection_1_0 connection,
+ final Session_1_0 session,
+ final Attach receivedAttach,
+ final int invocationOffset)
+ {
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
+ Attach sentAttach = (Attach) frameCapture.getAllValues().get(invocationOffset);
+
+ assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
+ assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+ }
+
+ private void assertQueues(final String publishingLinkName, final LifetimePolicy expectedLifetimePolicy)
+ {
+ final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after attach", 1, queues.size());
+ Queue queue = queues.iterator().next();
+ assertEquals("Unexpected queue durability",
+ expectedLifetimePolicy, queue.getLifetimePolicy());
+ // boolean isDurable = ((Source) attach.getSource()).getDurable() != TerminusDurability.NONE;
+ Collection<PublishingLink> queuePublishingLinks = queue.getPublishingLinks();
+ assertEquals("Unexpected number of publishing links", 1, queuePublishingLinks.size());
+ assertEquals("Unexpected link name", publishingLinkName, queuePublishingLinks.iterator().next().getName());
+
+ Exchange<?> exchange = _virtualHost.getChildByName(Exchange.class, "amq.direct");
+ assertTrue("Binding should exist", exchange.hasBinding(publishingLinkName, queue));
+ }
+
+ private void assertAttachFailed(final AMQPConnection_1_0 connection, final Session_1_0 session, final Attach attach, int invocationOffset)
+ {
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(connection, times(invocationOffset + 2)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
+ List<FrameBody> sentFrames = frameCapture.getAllValues();
+
+ assertTrue("unexpected Frame sent", sentFrames.get(invocationOffset) instanceof Attach);
+ Attach sentAttach = (Attach) sentFrames.get(invocationOffset);
+ assertEquals("Unexpected name", attach.getName(), sentAttach.getName());
+ assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+ assertEquals("Unexpected source", null, sentAttach.getSource());
+
+ assertTrue("unexpected Frame sent", sentFrames.get(invocationOffset + 1) instanceof Detach);
+ Detach sentDetach = (Detach) sentFrames.get(invocationOffset + 1);
+ assertTrue("Unexpected closed state", sentDetach.getClosed());
+ }
+
+ private void assertAttachFailed(final AMQPConnection_1_0 connection, final Session_1_0 session, final Attach attach)
+ {
+ assertAttachFailed(connection, session, attach, 0);
+ }
+
+ private Attach createSharedTopicAttach(final boolean durable,
+ final String linkName,
+ final String address,
+ final boolean isGlobal)
+ {
+ return createAttach(durable, linkName, address, TOPIC_CAPABILITY, isGlobal, true);
+ }
+
+ private Attach createTopicAttach(final boolean durable,
+ final String linkName,
+ final String address,
+ final boolean isGlobal)
+ {
+ return createAttach(durable, linkName, address, TOPIC_CAPABILITY, isGlobal, false);
+ }
+
+ private Attach createQueueAttach(final boolean durable,
+ final String linkName,
+ final String address)
+ {
+ return createAttach(durable, linkName, address, QUEUE_CAPABILITY, false, false);
+ }
+
+ private Attach createAttach(final boolean durable,
+ final String linkName,
+ final String address,
+ final Symbol destinationTypeCapability,
+ final boolean isGlobal,
+ final boolean isShared)
+ {
+ Attach attach = new Attach();
+ Source source = new Source();
+
+ List<Symbol> capabilities = new ArrayList<>();
+ if (isGlobal)
+ {
+ capabilities.add(Symbol.getSymbol("global"));
+ }
+ if (isShared)
+ {
+ capabilities.add(Symbol.getSymbol("shared"));
+ }
+ capabilities.add(destinationTypeCapability);
+
+
+ source.setCapabilities(capabilities.toArray(new Symbol[capabilities.size()]));
+ if (durable)
+ {
+ source.setDurable(TerminusDurability.CONFIGURATION);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ else
+ {
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ }
+ attach.setSource(source);
+ Target target = new Target();
+ attach.setTarget(target);
+ attach.setHandle(new UnsignedInteger(_handle++));
+ attach.setIncompleteUnsettled(false);
+ attach.setName(linkName);
+ attach.setRole(Role.RECEIVER);
+ source.setAddress(address);
+ return attach;
+ }
+
+ private AMQPConnection_1_0 createAmqpConnection_1_0()
+ {
+ return createAmqpConnection_1_0(null);
+ }
+
+ private AMQPConnection_1_0 createAmqpConnection_1_0(String containerId)
+ {
+ AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
+ Subject subject =
+ new Subject(true, Collections.<Principal>emptySet(), Collections.emptySet(), Collections.emptySet());
+ when(connection.getSubject()).thenReturn(subject);
+ when(connection.getAddressSpace()).thenReturn(_virtualHost);
+ when(connection.getEventLogger()).thenReturn(mock(EventLogger.class));
+ when(connection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L);
+ final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
+ {
+ @Override
+ public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable
+ {
+ runnableCaptor.getValue().run();
+ return Futures.immediateFuture(null);
+ }
+ });
+ AggregateTicker mockTicker = mock(AggregateTicker.class);
+ when(connection.getAggregateTicker()).thenReturn(mockTicker);
+ if (containerId != null)
+ {
+ when(connection.getRemoteContainerId()).thenReturn(containerId);
+ }
+ else
+ {
+ final String randomContainerId = UUID.randomUUID().toString();
+ when(connection.getRemoteContainerId()).thenReturn(randomContainerId);
+ }
+ return connection;
+ }
+
+ private Session_1_0 createSession_1_0(final AMQPConnection_1_0 connection, int channelId)
+ {
+ Begin begin = mock(Begin.class);
+ when(begin.getNextOutgoingId()).thenReturn(new UnsignedInteger(channelId));
+ Session_1_0 _session = new Session_1_0(connection, begin);
+ _session.setReceivingChannel((short)channelId);
+ _session.setSendingChannel((short)channelId);
+ return _session;
+ }
+
+ private void sendDetach(final Session_1_0 session,
+ final UnsignedInteger handle,
+ final boolean closed)
+ {
+ final Detach detach = new Detach();
+ detach.setHandle(handle);
+ detach.setClosed(closed);
+ session.receiveDetach(detach);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org