You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/24 18:17:12 UTC
svn commit: r1620147 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/q...
Author: rgodfrey
Date: Sun Aug 24 16:17:11 2014
New Revision: 1620147
URL: http://svn.apache.org/r1620147
Log:
QPID-6037 : [Java Client] Enhance experimental support for ADDR addressing to the 0-8/9/9-1 client
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
qpid/trunk/qpid/java/systests/etc/config-systests.json
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java Sun Aug 24 16:17:11 2014
@@ -85,7 +85,6 @@ public class BrokerOptions
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put("storePath", getConfigurationStoreLocation());
- attributes.put("storeTye", getConfigurationStoreType());
attributes.put(ConfiguredObject.CONTEXT, getConfigProperties());
return attributes;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sun Aug 24 16:17:11 2014
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -111,6 +112,12 @@ public abstract class AbstractExchange<T
public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
{
super(parentsMap(vhost), attributes);
+ Set<String> providedAttributeNames = new HashSet<>(attributes.keySet());
+ providedAttributeNames.removeAll(getAttributeNames());
+ if(!providedAttributeNames.isEmpty())
+ {
+ throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames);
+ }
_virtualHost = vhost;
// check ACL
try
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Sun Aug 24 16:17:11 2014
@@ -47,8 +47,6 @@ public interface Broker<X extends Broker
String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost";
String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod";
String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled";
- String STORE_TYPE = "storeType";
- String STORE_VERSION = "storeVersion";
String STORE_PATH = "storePath";
String MODEL_VERSION = "modelVersion";
String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider";
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Sun Aug 24 16:17:11 2014
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -87,6 +89,8 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES);
ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
+ ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE);
+
ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
@@ -147,6 +151,10 @@ public class QueueArgumentsConverter
{
value = ((Enum) value).name();
}
+ else if(value instanceof ConfiguredObject)
+ {
+ value = ((ConfiguredObject)value).getName();
+ }
wireArguments.put(entry.getKey(), value);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json Sun Aug 24 16:17:11 2014
@@ -20,7 +20,6 @@
*/
{
"name": "${broker.name}",
- "storeVersion": 1,
"modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Sun Aug 24 16:17:11 2014
@@ -32,6 +32,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -115,15 +116,22 @@ public class ExchangeDeclareHandler impl
{
String name = exchangeName == null ? null : exchangeName.intern().toString();
String type = body.getType() == null ? null : body.getType().intern().toString();
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ if(body.getArguments() != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+ }
attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
exchange = virtualHost.createExchange(attributes);
}
@@ -160,6 +168,10 @@ public class ExchangeDeclareHandler impl
// note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
+ catch (IllegalArgumentException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+ }
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Aug 24 16:17:11 2014
@@ -87,6 +87,8 @@ public class AMQConnection extends Close
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+
private final long _connectionNumber;
/**
@@ -160,7 +162,6 @@ public class AMQConnection extends Close
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
private AMQConnectionDelegate _delegate;
@@ -873,7 +874,7 @@ public class AMQConnection extends Close
public void close() throws JMSException
{
- close(DEFAULT_TIMEOUT);
+ close(DEFAULT_CLOSE_TIMEOUT);
}
public void close(long timeout) throws JMSException
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Aug 24 16:17:11 2014
@@ -313,6 +313,10 @@ public abstract class AMQSession<C exten
return _immediatePrefetch;
}
+ abstract void handleNodeDelete(final AMQDestination dest) throws AMQException;
+
+ abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Aug 24 16:17:11 2014
@@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQ
}
}
+ @Override
void handleNodeDelete(AMQDestination dest) throws AMQException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Aug 24 16:17:11 2014
@@ -29,6 +29,7 @@ import static org.apache.qpid.configurat
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -64,6 +65,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQS
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest,
+ final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException, FailoverException
{
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
- (getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(getChannelId()), QueueBindOkBody.class);
+ if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+ (getTicket(), queueName, exchangeName, routingKey, false, arguments).
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+
+ }
+ else
+ {
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
+ List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
+ bindings.addAll(destination.getNode().getBindings());
+
+ String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
+ destination.getAddressName(): "amq.topic";
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchange :
+ binding.getExchange();
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ doBind(destination, binding, queue, exchange);
+ }
+ }
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQS
Map<String,Object> bindingArguments = new HashMap<String, Object>();
bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
- bindQueue(AMQShortString.valueOf(queueName),
- AMQShortString.valueOf(dest.getSubject()),
- FieldTable.convertToFieldTable(bindingArguments),
- AMQShortString.valueOf(dest.getAddressName()),dest,false);
+ final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments);
+ doBind(dest, binding, queueName, dest.getAddressName());
}
@@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQS
getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+ public void sendExchangeDelete(final String name) throws AMQException, FailoverException
+ {
+ ExchangeDeleteBody body =
+ getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
AMQShortString queueName = amqd.getAMQQueueName();
@@ -821,18 +867,25 @@ public class AMQSession_0_8 extends AMQS
protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
- true,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
- false,
- null).generateFrame(getChannelId());
- QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler.getMessageCount();
+ if(isBound(null, amqd.getAMQQueueName(), null))
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(getChannelId());
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ return okHandler.getMessageCount();
+ }
+ else
+ {
+ return 0l;
+ }
}
protected boolean tagLE(long tag1, long tag2)
@@ -916,6 +969,11 @@ public class AMQSession_0_8 extends AMQS
{
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
+ String altExchange = node.getAlternateExchange();
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
(new FailoverNoopSupport<Void, AMQException>(
new FailoverProtectedOperation<Void, AMQException>()
@@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQS
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
+ String altExchange = dest.getNode().getAlternateExchange();
+ Map<String, Object> arguments = node.getDeclareArgs();
+
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
// can't set alt. exchange
declareExchange(AMQShortString.valueOf(dest.getAddressName()),
AMQShortString.valueOf(node.getExchangeType()),
false,
node.isDurable(),
node.isAutoDelete(),
- FieldTable.convertToFieldTable(node.getDeclareArgs()), false);
+ FieldTable.convertToFieldTable(arguments), false);
// If bindings are specified without a queue name and is called by the producer,
// the broker will send an exception as expected.
@@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQS
final String queue,
final String exchange) throws AMQException
{
- bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()),
- FieldTable.convertToFieldTable(binding.getArgs()),
- AMQShortString.valueOf(exchange),dest);
+ final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ QueueBindBody queueBindBody =
+ methodRegistry.createQueueBindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ false,
+ FieldTable.convertToFieldTable(binding.getArgs()));
+
+ getProtocolHandler().syncWrite(queueBindBody.
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+ return null;
+ }
+ }, getAMQConnection()).execute();
+
+ }
+
+
+ protected void doUnbind(final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ if (isBound(null, AMQShortString.valueOf(queue), null))
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ AMQMethodBody body;
+ if (methodRegistry instanceof MethodRegistry_0_9)
+ {
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
+ body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ null);
+ }
+ else if (methodRegistry instanceof MethodRegistry_0_91)
+ {
+ MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
+ body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(binding.getBindingKey()),
+ null);
+
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
+ getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
+ return null;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }, getAMQConnection()).execute();
}
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
@@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQS
return match;
}
+ @Override
+ void handleNodeDelete(final AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDelete(dest.getAddressName());
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendQueueDelete(AMQShortString.valueOf(dest.getAddressName()));
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ }
+
+ @Override
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doUnbind(binding, queue, exchange);
+ }
+ }
+
+
+ void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest.getQueueName(), false, false, false, false, null))
+ {
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDelete(AMQShortString.valueOf(dest.getQueueName()));
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+ }
+ }
+
protected void flushAcknowledgments()
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Aug 24 16:17:11 2014
@@ -20,19 +20,35 @@
*/
package org.apache.qpid.client;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsum
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -376,7 +377,23 @@ public abstract class BasicMessageConsum
*/
public boolean isExclusive()
{
- return _exclusive;
+
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
}
public boolean isReceiving()
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sun Aug 24 16:17:11 2014
@@ -17,12 +17,18 @@
*/
package org.apache.qpid.client;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSe
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This is a 0.10 message consumer.
*/
@@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 e
clearReceiveQueue();
}
}
-
- public boolean isExclusive()
- {
- AMQDestination dest = this.getDestination();
- if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
- {
- return true;
- }
- else
- {
- return dest.getLink().getSubscription().isExclusive();
- }
- }
- else
- {
- return super.isExclusive();
- }
- }
+
void postSubscription() throws AMQException
{
@@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 e
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
- ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
@@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 e
return capacity;
}
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sun Aug 24 16:17:11 2014
@@ -118,13 +118,33 @@ public class BasicMessageConsumer_0_8 ex
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
-
+ postSubscription();
+ getSession().sync();
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
+ void postSubscription() throws AMQException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
+ {
+ getSession().handleNodeDelete(dest);
+ }
+ // Subscription queue is handled as part of linkDelete method.
+ getSession().handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ getSession().deleteSubscriptionQueue(dest);
+ }
+ }
+ }
+
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun Aug 24 16:17:11 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.UUID;
+
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -32,13 +33,15 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+
+import org.slf4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -286,6 +289,31 @@ public abstract class BasicMessageProduc
{
setClosed();
_session.deregisterProducer(_producerId);
+ AMQDestination dest = getAMQDestination();
+ AMQSession ssn = getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ try
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.SENDER )
+ {
+ ssn.handleNodeDelete(dest);
+ }
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
+ }
+ }
}
public void send(Message message) throws JMSException
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sun Aug 24 16:17:11 2014
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -48,7 +47,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 e
try
{
getSession().resolveAddress(destination,false,false);
- ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
- ((AMQSession_0_10)getSession()).sync();
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
catch(Exception e)
{
@@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 e
public void close() throws JMSException
{
super.close();
- AMQDestination dest = getAMQDestination();
- AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
- if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- try
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- ssn.handleNodeDelete(dest);
- }
- ssn.handleLinkDelete(dest);
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
- ex.setLinkedException(e);
- ex.initCause(e);
- throw ex;
- }
- }
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sun Aug 24 16:17:11 2014
@@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.util.GZIPUtils;
@@ -63,6 +66,9 @@ public class BasicMessageProducer_0_8 ex
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
getSession().resolveAddress(destination, false, false);
+
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
else
{
@@ -92,18 +98,43 @@ public class BasicMessageProducer_0_8 ex
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
+
+
+
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+
+ AMQShortString routingKey = destination.getRoutingKey();
+
+ FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null
+ || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
+ {
+
+ if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ {
+ // use default subject in address string
+ headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
+ }
+
+ if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT));
+ }
+ }
+
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+ destination.getExchangeName(),
+ routingKey,
+ mandatory,
+ immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
- AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
- BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
contentHeaderProperties.setUserId(getUserID());
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Sun Aug 24 16:17:11 2014
@@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T>
{
_waiting.set(true);
- while (!_ready)
+ while (!_ready && _error == null)
{
try
{
Modified: qpid/trunk/qpid/java/systests/etc/config-systests.json
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/etc/config-systests.json?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/etc/config-systests.json (original)
+++ qpid/trunk/qpid/java/systests/etc/config-systests.json Sun Aug 24 16:17:11 2014
@@ -21,7 +21,6 @@
{
"name": "Broker",
"defaultVirtualHost" : "test",
- "storeVersion": 1,
"modelVersion": "2.0",
"authenticationproviders" : [ {
"name" : "plain",
Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Sun Aug 24 16:17:11 2014
@@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDest
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
@@ -76,7 +75,14 @@ public class AddressBasedDestinationTest
@Override
public void tearDown() throws Exception
{
- _connection.close();
+ try
+ {
+ _connection.close();
+ }
+ catch(JMSException e)
+ {
+ // ignore
+ }
super.tearDown();
}
@@ -90,14 +96,15 @@ public class AddressBasedDestinationTest
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
AMQDestination dest = new AMQAnyDestination(addr1);
+ final String queueErrorMessage = "The name 'testQueue1' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
try
{
cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(queueErrorMessage));
}
try
@@ -106,12 +113,12 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getCause().getMessage().contains(queueErrorMessage)
+ || e.getCause().getCause().getMessage().contains(queueErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,false));
+ (AMQSession)jmsSession).isQueueExist(dest,false));
// create always -------------------------------------------
@@ -120,9 +127,9 @@ public class AddressBasedDestinationTest
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
// create receiver -----------------------------------------
@@ -134,33 +141,36 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ String expectedMessage = "The name 'testQueue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
+ assertTrue(e.getCause().getMessage().contains(expectedMessage)
+ || e.getCause().getCause().getMessage().contains(expectedMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
dest = new AMQAnyDestination(addr1);
+ String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
try
{
cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
}
try
@@ -169,12 +179,12 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage)
+ || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -186,17 +196,16 @@ public class AddressBasedDestinationTest
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
}
@@ -234,22 +243,22 @@ public class AddressBasedDestinationTest
// Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+ (AMQSession)jmsSession).isQueueBound("amq.fanout",
dest.getAddressName(),null, null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
Map<String,Object> args = new HashMap<String,Object>();
@@ -257,7 +266,7 @@ public class AddressBasedDestinationTest
args.put("dep","sales");
args.put("loc","CA");
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
MessageProducer prod = jmsSession.createProducer(dest);
@@ -339,6 +348,11 @@ public class AddressBasedDestinationTest
{
return;
}
+ else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010()
+ && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode()))
+ {
+ return;
+ }
else
{
fail("Unexpected exception whilst creating consumer: " + e);
@@ -346,11 +360,11 @@ public class AddressBasedDestinationTest
}
assertTrue("Exchange not created as expected",(
- (AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
+ (AMQSession)jmsSession).isExchangeExist(dest,true));
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+ (AMQSession)jmsSession).isQueueBound("my-exchange",
dest.getQueueName(),"hello", null));
// The client should be able to query and verify the existence of my-exchange (QPID-2774)
@@ -387,23 +401,23 @@ public class AddressBasedDestinationTest
public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
{
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), null));
+ assertTrue("Queue not bound as expected", (
+ (AMQSession) jmsSession).isQueueBound("",
+ dest.getAddressName(), dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
Address a = Address.parse(headersBinding);
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, a.getOptions()));
}
@@ -526,17 +540,17 @@ public class AddressBasedDestinationTest
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
assertTrue("Destination1 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
+ (AMQSession)jmsSession).isQueueExist(dest1, true));
assertTrue("Destination1 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
assertTrue("Destination2 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
+ (AMQSession)jmsSession).isQueueExist(dest2,true));
assertTrue("Destination2 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest2.getAddressName(),dest2.getAddressName(), null));
MessageProducer producer = jmsSession.createProducer(dest3);
@@ -587,7 +601,7 @@ public class AddressBasedDestinationTest
MessageProducer prod = ssn.createProducer(queue);
MessageConsumer cons = ssn.createConsumer(queue);
assertTrue("my-queue was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession)ssn).isQueueBound("amq.direct",
"my-queue","my-queue", null));
prod.send(ssn.createTextMessage("test"));
@@ -606,7 +620,7 @@ public class AddressBasedDestinationTest
{
String s = "The name 'my-queue2' supplied in the address " +
"doesn't resolve to an exchange or a queue";
- assertEquals(s,e.getCause().getCause().getMessage());
+ assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage()));
}
// explicit create case
@@ -614,7 +628,7 @@ public class AddressBasedDestinationTest
prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("",
+ (AMQSession)ssn).isQueueBound("",
"my-queue2","my-queue2", null));
prod.send(ssn.createTextMessage("test"));
@@ -631,7 +645,7 @@ public class AddressBasedDestinationTest
cons = ssn.createConsumer(queue);
prod = ssn.createProducer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
cons.close();
}
@@ -701,15 +715,15 @@ public class AddressBasedDestinationTest
prod = ssn.createProducer(topic);
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","bus", null));
assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","car", null));
assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","van", null));
Message msg = ssn.createTextMessage("test");
@@ -822,15 +836,18 @@ public class AddressBasedDestinationTest
catch(Exception e)
{
}
- _connection.close();
+ }
+
+ public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception
+ {
_connection = getConnection() ;
_connection.start();
- ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- dest = ssn.createTopic("ADDR:my_queue; {create: always}");
- consumer1 = ssn.createConsumer(dest);
- consumer2 = ssn.createConsumer(dest);
- prod = ssn.createProducer(dest);
+ final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+ final MessageConsumer consumer1 = ssn.createConsumer(dest);
+ final MessageConsumer consumer2 = ssn.createConsumer(dest);
+ final MessageProducer prod = ssn.createProducer(dest);
prod.send(ssn.createTextMessage("A"));
Message m1 = consumer1.receive(1000);
@@ -864,15 +881,15 @@ public class AddressBasedDestinationTest
MessageConsumer cons = ssn.createConsumer(topic);
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","NYSE.#", null));
assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","NASDAQ.#", null));
assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
MessageProducer prod = ssn.createProducer(topic);
@@ -886,7 +903,7 @@ public class AddressBasedDestinationTest
public void testXSubscribeOverrides() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
Destination dest = ssn.createTopic(str);
MessageConsumer consumer1 = ssn.createConsumer(dest);
try
@@ -937,7 +954,7 @@ public class AddressBasedDestinationTest
props.setProperty("destination.address1", "ADDR:amq.topic/test");
props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
- String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
props.setProperty("destination.address5", addrStr);
Context ctx = new InitialContext(props);
@@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
}
/**
@@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest
m.setJMSReplyTo(replyToDest);
prod.send(m);
- Message msg = cons.receive();
+ Message msg = cons.receive(5000l);
MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
prodR.send(session.createTextMessage("x"));
- Message m1 = replyToCons.receive();
+ Message m1 = replyToCons.receive(5000l);
assertNotNull("The reply to consumer should have received the messsage",m1);
}
@@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest
AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
- AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession;
+ AMQSession ssn = (AMQSession)jmsSession;
assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true));
assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
@@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest
String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}";
AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr);
- ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ ((AMQSession)ssn).isQueueExist(verifyDest, true);
// Verify that the producer does not delete the subscription queue.
MessageProducer prod = ssn.createProducer(dest);
prod.close();
- ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ ((AMQSession)ssn).isQueueExist(verifyDest, true);
}
}
Modified: qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes Sun Aug 24 16:17:11 2014
@@ -25,9 +25,17 @@
org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
org.apache.qpid.server.message.MessageProtocolConversionTest#*
+//QPID-3422: test fails because ring queue is not implemented on java broker
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
+//QPID-3392: the Java broker does not yet implement exchange creation arguments
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
+// you want a topic behaviour. The 0-10 client thinks you must want a queue.
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10
+
// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org