You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/01 21:24:36 UTC
svn commit: r1488561 [2/3] - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/transport/
broker/src/test/java/org/apache/qpi...
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Jun 1 19:24:36 2013
@@ -440,7 +440,7 @@ public abstract class AMQSession<C exten
// If the session has been closed don't waste time creating a thread to do
// flow control
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
+ {
// Only execute change if previous state
// was False
if (!_suspendState.getAndSet(true))
@@ -535,7 +535,7 @@ public abstract class AMQSession<C exten
}
public abstract AMQException getLastException();
-
+
public void checkNotClosed() throws JMSException
{
try
@@ -553,7 +553,7 @@ public abstract class AMQSession<C exten
ssnClosed.setLinkedException(ex);
ssnClosed.initCause(ex);
throw ssnClosed;
- }
+ }
else
{
throw ise;
@@ -987,13 +987,13 @@ public abstract class AMQSession<C exten
// Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
return createDurableSubscriber(topic, name, null, false);
}
-
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
throws JMSException
{
checkNotClosed();
Topic origTopic = checkValidTopic(topic, true);
-
+
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
if (dest.getDestSyntax() == DestSyntax.ADDR &&
!dest.isAddressResolved())
@@ -1015,20 +1015,20 @@ public abstract class AMQSession<C exten
throw toJMSException("Error when verifying destination", e);
}
}
-
+
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
-
+
_subscriberDetails.lock();
try
{
TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
-
+
// Not subscribed to this name in the current session
if (subscriber == null)
{
// After the address is resolved routing key will not be null.
AMQShortString topicName = dest.getRoutingKey();
-
+
if (_strictAMQP)
{
if (_strictAMQPFATAL)
@@ -1046,8 +1046,8 @@ public abstract class AMQSession<C exten
else
{
Map<String,Object> args = new HashMap<String,Object>();
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
// durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
@@ -1060,16 +1060,28 @@ public abstract class AMQSession<C exten
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
- boolean isQueueBoundForTopicAndSelector =
+ boolean isQueueBoundForTopicAndSelector =
isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
deleteQueue(dest.getAMQQueueName());
}
+ else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
+ {
+ try
+ {
+ bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(),
+ FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true);
+ }
+ catch(AMQException e)
+ {
+ throw toJMSException("Error when checking binding",e);
+ }
+ }
}
}
- else
+ else
{
// Subscribed with the same topic and no current / previous or same selector
if (subscriber.getTopic().equals(topic)
@@ -1100,7 +1112,7 @@ public abstract class AMQSession<C exten
{
_subscriberAccess.unlock();
}
-
+
return subscriber;
}
catch (TransportException e)
@@ -1193,19 +1205,19 @@ public abstract class AMQSession<C exten
if (syntax == AMQDestination.DestSyntax.BURL)
{
// For testing we may want to use the prefix
- return new AMQQueue(getDefaultQueueExchangeName(),
+ return new AMQQueue(getDefaultQueueExchangeName(),
new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
}
else
{
AMQQueue queue = new AMQQueue(queueName);
return queue;
-
+
}
}
else
{
- return new AMQQueue(queueName);
+ return new AMQQueue(queueName);
}
}
catch (URISyntaxException urlse)
@@ -1341,7 +1353,7 @@ public abstract class AMQSession<C exten
return new QueueReceiverAdaptor(dest, consumer);
}
-
+
private Queue validateQueue(Destination dest) throws InvalidDestinationException
{
if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
@@ -1497,9 +1509,9 @@ public abstract class AMQSession<C exten
}
else
{
- return new AMQTopic(topicName);
+ return new AMQTopic(topicName);
}
-
+
}
catch (URISyntaxException urlse)
{
@@ -1646,16 +1658,24 @@ public abstract class AMQSession<C exten
_logger.debug("Message[" + message.toString() + "] received in session");
}
_highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
throws
AMQException
{
+ declareAndBind(amqd, new FieldTable());
+ }
+
+
+ public void declareAndBind(AMQDestination amqd, FieldTable arguments)
+ throws
+ AMQException
+ {
declareExchange(amqd, false);
AMQShortString queueName = declareQueue(amqd, false);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd);
}
/**
@@ -1681,7 +1701,7 @@ public abstract class AMQSession<C exten
* Not that this does not necessarily mean that the recovery has failed, but simply that it is
* not possible to tell if it has or not.
* @todo Be aware of possible changes to parameter order as versions change.
- *
+ *
* Strategy for handling recover.
* Flush any acks not yet sent.
* Stop the message flow.
@@ -1730,7 +1750,7 @@ public abstract class AMQSession<C exten
}
sendRecover();
-
+
markClean();
if (!isSuspended)
@@ -1755,7 +1775,7 @@ public abstract class AMQSession<C exten
protected abstract void sendRecover() throws AMQException, FailoverException;
protected abstract void flushAcknowledgments();
-
+
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -1851,7 +1871,7 @@ public abstract class AMQSession<C exten
public void setMessageListener(MessageListener listener) throws JMSException
{
}
-
+
/**
* @see #unsubscribe(String, boolean)
*/
@@ -1866,20 +1886,20 @@ public abstract class AMQSession<C exten
throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
}
}
-
+
/**
* Unsubscribe from a subscription.
- *
+ *
* @param name the name of the subscription to unsubscribe
* @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
* queue is not bound, possibly due to the subscription being closed.
- * @throws JMSException on
+ * @throws JMSException on
* @throws InvalidDestinationException
*/
private void unsubscribe(String name, boolean safe) throws JMSException
{
TopicSubscriberAdaptor<C> subscriber;
-
+
_subscriberDetails.lock();
try
{
@@ -1896,11 +1916,11 @@ public abstract class AMQSession<C exten
{
_subscriberDetails.unlock();
}
-
+
if (subscriber != null)
{
subscriber.close();
-
+
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
@@ -1917,7 +1937,7 @@ public abstract class AMQSession<C exten
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+ " Requesting queue deletion regardless.");
}
-
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else // Queue Browser
@@ -1936,8 +1956,9 @@ public abstract class AMQSession<C exten
}
protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose) throws JMSException
+ final int prefetchLow, final boolean noLocal,
+ final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -2111,7 +2132,7 @@ public abstract class AMQSession<C exten
throws JMSException;
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
-
+
public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;
/**
@@ -2844,14 +2865,19 @@ public abstract class AMQSession<C exten
{
declareExchange(amqd, nowait);
}
-
+
if (_delareQueues || amqd.isNameRequired())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
- bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
+ if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
+ {
+ bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
+ amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
+ }
+
}
-
+
AMQShortString queueName = amqd.getAMQQueueName();
// store the consumer queue name
@@ -2895,10 +2921,13 @@ public abstract class AMQSession<C exten
}
}
+ protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+ throws AMQException;
+
public abstract void resolveAddress(AMQDestination dest,
boolean isConsumer,
boolean noLocal) throws AMQException;
-
+
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(new Long(producerId), producer);
@@ -3189,7 +3218,7 @@ public abstract class AMQSession<C exten
}
-
+
public void run()
{
if (_dispatcherLogger.isDebugEnabled())
@@ -3304,7 +3333,7 @@ public abstract class AMQSession<C exten
if (updateRollbackMark(current, deliveryTag))
{
_rollbackMark.compareAndSet(current, deliveryTag);
- }
+ }
}
private void notifyConsumer(UnprocessedMessage message)
@@ -3424,7 +3453,7 @@ public abstract class AMQSession<C exten
{
return super.isClosing() || _connection.isClosing();
}
-
+
public boolean isDeclareExchanges()
{
return _declareExchanges;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sat Jun 1 19:24:36 2013
@@ -143,7 +143,7 @@ public class AMQSession_0_10 extends AMQ
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
private RangeSet unacked = RangeSetFactory.createRangeSet();
- private int unackedCount = 0;
+ private int unackedCount = 0;
/**
* Used to store the range of in tx messages
@@ -292,7 +292,7 @@ public class AMQSession_0_10 extends AMQ
{
flushAcknowledgments(false);
}
-
+
void flushAcknowledgments(boolean setSyncBit)
{
synchronized (unacked)
@@ -310,7 +310,7 @@ public class AMQSession_0_10 extends AMQ
{
messageAcknowledge(ranges,accept,false);
}
-
+
void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
final Session ssn = getQpidSession();
@@ -354,15 +354,15 @@ public class AMQSession_0_10 extends AMQ
if (destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
-
+
for (AMQShortString rk: destination.getBindingKeys())
{
- _logger.debug("Binding queue : " + queueName.toString() +
- " exchange: " + exchangeName.toString() +
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
" using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(),
- exchangeName.toString(),
- rk.toString(),
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
args);
}
}
@@ -371,10 +371,10 @@ public class AMQSession_0_10 extends AMQ
// Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
bindings.addAll(destination.getNode().getBindings());
-
+
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
-
+
for (Binding binding: bindings)
{
// Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
@@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQ
}
String queue = binding.getQueue() == null?
queueName.asString(): binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
+
+ String exchange = binding.getExchange() == null ?
defaultExchange :
binding.getExchange();
-
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
+ getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
- binding.getArgs());
+ binding.getArgs());
}
}
-
+
if (!nowait)
{
// We need to sync so that we get notify of an error.
@@ -561,20 +561,18 @@ public class AMQSession_0_10 extends AMQ
*/
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
{
return isQueueBound(exchangeName,queueName,routingKey,null);
}
- public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ public boolean isQueueBound(final AMQDestination destination)
{
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
}
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
- throws JMSException
{
- String rk = null;
+ String rk = null;
if (bindingKeys != null && bindingKeys.length>0)
{
rk = bindingKeys[0].toString();
@@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQ
{
rk = routingKey.toString();
}
-
+
return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
}
-
+
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
{
boolean res;
@@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQ
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
else
- {
+ {
if (args == null)
{
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched());
}
else
{
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
.getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
}
}
return res;
}
+ @Override
+ protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+ {
+ return isQueueBound(exchangeName, amqQueueName, routingKey);
+ }
+
/**
* This method is invoked when a consumer is created
* Registers the consumer with the broker
@@ -730,7 +734,7 @@ public class AMQSession_0_10 extends AMQ
}
/**
- * deletes an exchange
+ * deletes an exchange
*/
public void sendExchangeDelete(final String name, final boolean nowait)
throws AMQException, FailoverException
@@ -763,12 +767,12 @@ public class AMQSession_0_10 extends AMQ
}
if (amqd.getDestSyntax() == DestSyntax.BURL)
- {
+ {
Map<String,Object> arguments = new HashMap<String,Object>();
if (noLocal)
- {
+ {
arguments.put(AddressHelper.NO_LOCAL, true);
- }
+ }
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
@@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQ
arguments,
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
- node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
// passive --> false
@@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQ
try
{
long capacity = consumer.getCapacity();
-
+
if (capacity == 0)
{
if (consumer.getMessageListener() != null)
@@ -1090,20 +1094,20 @@ public class AMQSession_0_10 extends AMQ
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
-
+
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
- match = !result.getNotFound();
+ match = !result.getNotFound();
Node node = dest.getNode();
-
+
if (match)
{
if (assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
- (node.getExchangeType() != null &&
+ match = (result.getDurable() == node.isDurable()) &&
+ (node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
@@ -1125,7 +1129,7 @@ public class AMQSession_0_10 extends AMQ
return match;
}
-
+
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1137,7 +1141,7 @@ public class AMQSession_0_10 extends AMQ
if (match && assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
+ match = (result.getDurable() == node.isDurable()) &&
(result.getAutoDelete() == node.isAutoDelete()) &&
(result.getExclusive() == node.isExclusive()) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
@@ -1165,17 +1169,17 @@ public class AMQSession_0_10 extends AMQ
}
return match;
}
-
+
private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
{
boolean match = true;
for (String key: source.keySet())
{
- match = target.containsKey(key) &&
+ match = target.containsKey(key) &&
target.get(key).equals(source.get(key));
-
- if (!match)
- {
+
+ if (!match)
+ {
StringBuffer buf = new StringBuffer();
buf.append("Property given in address did not match with the args sent by the broker.");
buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
@@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQ
return match;
}
}
-
+
return match;
}
/**
* 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
+ * 2. if type == queue,
* 2.1 verify queue exists or create if create == true
* 2.2 If not throw exception
- *
+ *
* 3. if type == exchange,
* 3.1 verify exchange exists or create if create == true
* 3.2 if not throw exception
* 3.3 if exchange exists (or created) create subscription queue.
*/
-
+
@SuppressWarnings("deprecation")
public void resolveAddress(AMQDestination dest,
boolean isConsumer,
@@ -1211,21 +1215,21 @@ public class AMQSession_0_10 extends AMQ
}
else
{
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
+ boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
(isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
+
boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
(isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
+
+
+
int type = resolveAddressType(dest);
-
+
switch (type)
{
- case AMQDestination.QUEUE_TYPE:
+ case AMQDestination.QUEUE_TYPE:
{
if(createNode)
{
@@ -1239,24 +1243,24 @@ public class AMQSession_0_10 extends AMQ
break;
}
}
-
- case AMQDestination.TOPIC_TYPE:
+
+ case AMQDestination.TOPIC_TYPE:
{
if(createNode)
- {
+ {
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
handleExchangeNodeCreation(dest);
break;
}
else if (isExchangeExist(dest,assertNode))
- {
+ {
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
break;
}
}
-
+
default:
throw new AMQException(
"The name '" + dest.getAddressName() +
@@ -1265,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
dest.setAddressResolved(System.currentTimeMillis());
}
}
-
+
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1292,14 +1296,14 @@ public class AMQSession_0_10 extends AMQ
}
dest.setAddressType(type);
return type;
- }
+ }
}
-
+
private void verifySubject(AMQDestination dest) throws AMQException
{
if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
{
-
+
if ("topic".equals(dest.getExchangeClass().toString()))
{
dest.setRoutingKey(new AMQShortString("#"));
@@ -1364,12 +1368,12 @@ public class AMQSession_0_10 extends AMQ
// legacy support
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
+ dest.setExchangeClass(node.getExchangeType() == null?
ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
- new AMQShortString(node.getExchangeType()));
+ new AMQShortString(node.getExchangeType()));
dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
-
+
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1412,7 +1416,7 @@ public class AMQSession_0_10 extends AMQ
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
getPrefetchedMessageTags().addAll(tags);
}
-
+
RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sat Jun 1 19:24:36 2013
@@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQS
// thread.
// We can't close the session if we are already in the process of
// closing/closed the connection.
-
+
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
|| getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
{
@@ -381,10 +381,7 @@ public class AMQSession_0_8 extends AMQS
{
public AMQMethodEvent execute() throws AMQException, FailoverException
{
- AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
- (exchangeName, routingKey, queueName).generateFrame(getChannelId());
-
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ return sendExchangeBound(exchangeName, routingKey, queueName);
}
}, getAMQConnection()).execute();
@@ -398,7 +395,38 @@ public class AMQSession_0_8 extends AMQS
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
+ }
+
+ @Override
+ protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws AMQException
+ {
+
+ AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ return sendExchangeBound(exchangeName, routingKey, queueName);
+
+ }
+ }, getAMQConnection()).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ return (responseBody.getReplyCode() == 0);
+ }
+
+ private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
+ AMQShortString routingKey,
+ AMQShortString queueName) throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
+ (exchangeName, routingKey, queueName).generateFrame(getChannelId());
+
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ }
@Override
public void sendConsume(BasicMessageConsumer_0_8 consumer,
@@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQS
JMSException ex = new JMSException("Error creating producer");
ex.initCause(e);
ex.setLinkedException(e);
-
+
throw ex;
}
}
@@ -609,7 +637,7 @@ public class AMQSession_0_8 extends AMQS
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
-
+
return null;
}
}, getAMQConnection()).execute();
@@ -671,7 +699,7 @@ public class AMQSession_0_8 extends AMQS
false,
null).generateFrame(getChannelId());
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
return okHandler.getMessageCount();
}
@@ -689,9 +717,9 @@ public class AMQSession_0_8 extends AMQS
{
return AMQMessageDelegateFactory.FACTORY_0_8;
}
-
+
public void sync() throws AMQException
- {
+ {
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
@@ -702,10 +730,10 @@ public class AMQSession_0_8 extends AMQS
throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
}
-
+
protected void flushAcknowledgments()
{
-
+
}
@Override
@@ -744,7 +772,7 @@ public class AMQSession_0_8 extends AMQS
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
AMQStateManager manager = getProtocolHandler().getStateManager();
-
+
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
&& e != null)
@@ -752,15 +780,15 @@ public class AMQSession_0_8 extends AMQS
if (e instanceof AMQException)
{
return (AMQException) e;
- }
+ }
else
{
AMQException amqe = new AMQException(AMQConstant
- .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
e.getMessage(), e.getCause());
return amqe;
}
- }
+ }
else
{
return null;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Sat Jun 1 19:24:36 2013
@@ -42,7 +42,7 @@ public class AMQTopic extends AMQDestina
{
super(address);
}
-
+
protected AMQTopic()
{
super();
@@ -89,6 +89,12 @@ public class AMQTopic extends AMQDestina
super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable);
}
+
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable);
+ }
+
protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
{
@@ -114,10 +120,10 @@ public class AMQTopic extends AMQDestina
AMQTopic t = new AMQTopic(qpidTopic.getAddress());
AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
// link is never null if dest was created using an address string.
- t.getLink().setName(queueName.asString());
+ t.getLink().setName(queueName.asString());
t.getLink().getSubscriptionQueue().setAutoDelete(false);
t.getLink().setDurable(true);
-
+
// The legacy fields are also populated just in case.
t.setQueueName(queueName);
t.setAutoDelete(false);
@@ -134,7 +140,7 @@ public class AMQTopic extends AMQDestina
}
else
{
- return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+ return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getExchangeClass(), qpidTopic.getRoutingKey(), false,
getDurableTopicQueueName(subscriptionName, connection),
true);
}
@@ -165,7 +171,7 @@ public class AMQTopic extends AMQDestina
return null;
}
}
-
+
@Override
public AMQShortString getExchangeName()
{
@@ -181,9 +187,9 @@ public class AMQTopic extends AMQDestina
public AMQShortString getRoutingKey()
{
- if (super.getRoutingKey() != null)
+ if (super.getRoutingKey() != null)
{
- return super.getRoutingKey();
+ return super.getRoutingKey();
}
else if (getSubject() != null)
{
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java Sat Jun 1 19:24:36 2013
@@ -40,7 +40,6 @@ public enum AMQPFilterTypes
/** The identifying string for the filter type. */
private final AMQShortString _value;
-
/**
* Creates a new filter type from its identifying string.
*
@@ -60,4 +59,10 @@ public enum AMQPFilterTypes
{
return _value;
}
+
+ @Override
+ public String toString()
+ {
+ return _value.asString();
+ }
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Sat Jun 1 19:24:36 2013
@@ -83,9 +83,12 @@ public class ReturnUnroutableMandatoryMe
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
+
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
- consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft);
+ consumerSession.declareAndBind(queue, ft);
+
+ consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sat Jun 1 19:24:36 2013
@@ -111,7 +111,7 @@ public class BindingLoggingTest extends
String messageID = "BND-1001";
String queueName = _queue.getQueueName();
String exchange = "direct/amq.direct";
- String message = "Create : Arguments : {x-filter-jms-selector=}";
+ String message = "Create";
validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName);
}
@@ -145,7 +145,7 @@ public class BindingLoggingTest extends
// Perform full testing on the binding
String message = getMessageString(fromMessage(getLogMessage(results, 0)));
-
+
validateLogMessage(getLogMessage(results, 0), messageID, message,
"topic/amq.topic", "topic", "clientid:" + getName());
@@ -208,17 +208,17 @@ public class BindingLoggingTest extends
validateMessageID(messageID, log);
String subject = fromSubject(log);
-
+
validateBindingDeleteArguments(subject, "/test");
assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
}
-
+
private void validateBindingDeleteArguments(String subject, String vhostName)
{
String routingKey = AbstractTestLogSubject.getSlice("rk", subject);
-
+
assertTrue("Routing Key does not start with TempQueue:"+routingKey,
routingKey.startsWith("TempQueue"));
assertEquals("Virtualhost not correct.", vhostName,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org