You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/01 21:24:36 UTC

svn commit: r1488561 [2/3] - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/transport/ broker/src/test/java/org/apache/qpi...

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Jun  1 19:24:36 2013
@@ -440,7 +440,7 @@ public abstract class AMQSession<C exten
                                                              // If the session has been closed don't waste time creating a thread to do
                                                              // flow control
                                                              if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
-                                                             {   
+                                                             {
                                                                  // Only execute change if previous state
                                                                  // was False
                                                                  if (!_suspendState.getAndSet(true))
@@ -535,7 +535,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract AMQException getLastException();
-    
+
     public void checkNotClosed() throws JMSException
     {
         try
@@ -553,7 +553,7 @@ public abstract class AMQSession<C exten
                 ssnClosed.setLinkedException(ex);
                 ssnClosed.initCause(ex);
                 throw ssnClosed;
-            } 
+            }
             else
             {
                 throw ise;
@@ -987,13 +987,13 @@ public abstract class AMQSession<C exten
         // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method
         return createDurableSubscriber(topic, name, null, false);
     }
-    
+
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
             throws JMSException
     {
         checkNotClosed();
         Topic origTopic = checkValidTopic(topic, true);
-        
+
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
         if (dest.getDestSyntax() == DestSyntax.ADDR &&
             !dest.isAddressResolved())
@@ -1015,20 +1015,20 @@ public abstract class AMQSession<C exten
                 throw toJMSException("Error when verifying destination", e);
             }
         }
-        
+
         String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
-        
+
         _subscriberDetails.lock();
         try
         {
             TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
-            
+
             // Not subscribed to this name in the current session
             if (subscriber == null)
             {
                 // After the address is resolved routing key will not be null.
                 AMQShortString topicName = dest.getRoutingKey();
-                
+
                 if (_strictAMQP)
                 {
                     if (_strictAMQPFATAL)
@@ -1046,8 +1046,8 @@ public abstract class AMQSession<C exten
                 else
                 {
                     Map<String,Object> args = new HashMap<String,Object>();
-                    
-                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a 
+
+                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
                     // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
                     // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
                     // argument, as specifying null for the arguments when querying means they should not be checked at all
@@ -1060,16 +1060,28 @@ public abstract class AMQSession<C exten
                     // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                     // says we must trash the subscription.
                     boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
-                    boolean isQueueBoundForTopicAndSelector = 
+                    boolean isQueueBoundForTopicAndSelector =
                                 isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
 
                     if (isQueueBound && !isQueueBoundForTopicAndSelector)
                     {
                         deleteQueue(dest.getAMQQueueName());
                     }
+                    else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
+                    {
+                        try
+                        {
+                            bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(),
+                                                    FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true);
+                        }
+                        catch(AMQException e)
+                        {
+                            throw toJMSException("Error when checking binding",e);
+                        }
+                    }
                 }
             }
-            else 
+            else
             {
                 // Subscribed with the same topic and no current / previous or same selector
                 if (subscriber.getTopic().equals(topic)
@@ -1100,7 +1112,7 @@ public abstract class AMQSession<C exten
             {
                 _subscriberAccess.unlock();
             }
-    
+
             return subscriber;
         }
         catch (TransportException e)
@@ -1193,19 +1205,19 @@ public abstract class AMQSession<C exten
                 if (syntax == AMQDestination.DestSyntax.BURL)
                 {
                     // For testing we may want to use the prefix
-                    return new AMQQueue(getDefaultQueueExchangeName(), 
+                    return new AMQQueue(getDefaultQueueExchangeName(),
                                         new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
                 }
                 else
                 {
                     AMQQueue queue = new AMQQueue(queueName);
                     return queue;
-                    
+
                 }
             }
             else
             {
-                return new AMQQueue(queueName);            
+                return new AMQQueue(queueName);
             }
         }
         catch (URISyntaxException urlse)
@@ -1341,7 +1353,7 @@ public abstract class AMQSession<C exten
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
-    
+
     private Queue validateQueue(Destination dest) throws InvalidDestinationException
     {
         if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
@@ -1497,9 +1509,9 @@ public abstract class AMQSession<C exten
             }
             else
             {
-                return new AMQTopic(topicName);            
+                return new AMQTopic(topicName);
             }
-        
+
         }
         catch (URISyntaxException urlse)
         {
@@ -1646,16 +1658,24 @@ public abstract class AMQSession<C exten
             _logger.debug("Message[" + message.toString() + "] received in session");
         }
         _highestDeliveryTag.set(message.getDeliveryTag());
-        _queue.add(message);        
+        _queue.add(message);
     }
 
     public void declareAndBind(AMQDestination amqd)
             throws
             AMQException
     {
+        declareAndBind(amqd, new FieldTable());
+    }
+
+
+    public void declareAndBind(AMQDestination amqd, FieldTable arguments)
+            throws
+            AMQException
+    {
         declareExchange(amqd, false);
         AMQShortString queueName = declareQueue(amqd, false);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
+        bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd);
     }
 
     /**
@@ -1681,7 +1701,7 @@ public abstract class AMQSession<C exten
      *                      Not that this does not necessarily mean that the recovery has failed, but simply that it is
      *                      not possible to tell if it has or not.
      * @todo Be aware of possible changes to parameter order as versions change.
-     * 
+     *
      * Strategy for handling recover.
      * Flush any acks not yet sent.
      * Stop the message flow.
@@ -1730,7 +1750,7 @@ public abstract class AMQSession<C exten
             }
 
             sendRecover();
-            
+
             markClean();
 
             if (!isSuspended)
@@ -1755,7 +1775,7 @@ public abstract class AMQSession<C exten
     protected abstract void sendRecover() throws AMQException, FailoverException;
 
     protected abstract void flushAcknowledgments();
-    
+
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
@@ -1851,7 +1871,7 @@ public abstract class AMQSession<C exten
     public void setMessageListener(MessageListener listener) throws JMSException
     {
     }
-    
+
     /**
      * @see #unsubscribe(String, boolean)
      */
@@ -1866,20 +1886,20 @@ public abstract class AMQSession<C exten
             throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
         }
     }
-    
+
     /**
      * Unsubscribe from a subscription.
-     * 
+     *
      * @param name the name of the subscription to unsubscribe
      * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the
      * queue is not bound, possibly due to the subscription being closed.
-     * @throws JMSException on 
+     * @throws JMSException on
      * @throws InvalidDestinationException
      */
     private void unsubscribe(String name, boolean safe) throws JMSException
     {
         TopicSubscriberAdaptor<C> subscriber;
-        
+
         _subscriberDetails.lock();
         try
         {
@@ -1896,11 +1916,11 @@ public abstract class AMQSession<C exten
         {
             _subscriberDetails.unlock();
         }
-        
+
         if (subscriber != null)
         {
             subscriber.close();
-            
+
             // send a queue.delete for the subscription
             deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
         }
@@ -1917,7 +1937,7 @@ public abstract class AMQSession<C exten
                     _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
                                  + " Requesting queue deletion regardless.");
                 }
-                
+
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             }
             else // Queue Browser
@@ -1936,8 +1956,9 @@ public abstract class AMQSession<C exten
     }
 
     protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
-                                                 final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-                                                 final boolean noConsume, final boolean autoClose) throws JMSException
+                                   final int prefetchLow, final boolean noLocal,
+                                   final boolean exclusive, String selector, final FieldTable rawSelector,
+                                   final boolean noConsume, final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
@@ -2111,7 +2132,7 @@ public abstract class AMQSession<C exten
             throws JMSException;
 
     public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
-    
+
     public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;
 
     /**
@@ -2844,14 +2865,19 @@ public abstract class AMQSession<C exten
             {
                 declareExchange(amqd, nowait);
             }
-    
+
             if (_delareQueues || amqd.isNameRequired())
             {
                 declareQueue(amqd, consumer.isNoLocal(), nowait);
             }
-            bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
+            if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
+            {
+                bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
+                        amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
+            }
+
         }
-        
+
         AMQShortString queueName = amqd.getAMQQueueName();
 
         // store the consumer queue name
@@ -2895,10 +2921,13 @@ public abstract class AMQSession<C exten
         }
     }
 
+    protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+            throws AMQException;
+
     public abstract void resolveAddress(AMQDestination dest,
                                                        boolean isConsumer,
                                                        boolean noLocal) throws AMQException;
-    
+
     private void registerProducer(long producerId, MessageProducer producer)
     {
         _producers.put(new Long(producerId), producer);
@@ -3189,7 +3218,7 @@ public abstract class AMQSession<C exten
 
         }
 
-        
+
         public void run()
         {
             if (_dispatcherLogger.isDebugEnabled())
@@ -3304,7 +3333,7 @@ public abstract class AMQSession<C exten
             if (updateRollbackMark(current, deliveryTag))
             {
                 _rollbackMark.compareAndSet(current, deliveryTag);
-            }            
+            }
         }
 
         private void notifyConsumer(UnprocessedMessage message)
@@ -3424,7 +3453,7 @@ public abstract class AMQSession<C exten
     {
         return super.isClosing() || _connection.isClosing();
     }
-    
+
     public boolean isDeclareExchanges()
     {
     	return _declareExchanges;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sat Jun  1 19:24:36 2013
@@ -143,7 +143,7 @@ public class AMQSession_0_10 extends AMQ
     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
     private TimerTask flushTask = null;
     private RangeSet unacked = RangeSetFactory.createRangeSet();
-    private int unackedCount = 0;    
+    private int unackedCount = 0;
 
     /**
      * Used to store the range of in tx messages
@@ -292,7 +292,7 @@ public class AMQSession_0_10 extends AMQ
     {
         flushAcknowledgments(false);
     }
-    
+
     void flushAcknowledgments(boolean setSyncBit)
     {
         synchronized (unacked)
@@ -310,7 +310,7 @@ public class AMQSession_0_10 extends AMQ
     {
         messageAcknowledge(ranges,accept,false);
     }
-    
+
     void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
     {
         final Session ssn = getQpidSession();
@@ -354,15 +354,15 @@ public class AMQSession_0_10 extends AMQ
         if (destination.getDestSyntax() == DestSyntax.BURL)
         {
             Map args = FieldTableSupport.convertToMap(arguments);
-    
+
             for (AMQShortString rk: destination.getBindingKeys())
             {
-                _logger.debug("Binding queue : " + queueName.toString() + 
-                              " exchange: " + exchangeName.toString() + 
+                _logger.debug("Binding queue : " + queueName.toString() +
+                              " exchange: " + exchangeName.toString() +
                               " using binding key " + rk.asString());
-                getQpidSession().exchangeBind(queueName.toString(), 
-                                              exchangeName.toString(), 
-                                              rk.toString(), 
+                getQpidSession().exchangeBind(queueName.toString(),
+                                              exchangeName.toString(),
+                                              rk.toString(),
                                               args);
             }
         }
@@ -371,10 +371,10 @@ public class AMQSession_0_10 extends AMQ
             // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
             List<Binding> bindings = new ArrayList<Binding>();
             bindings.addAll(destination.getNode().getBindings());
-            
+
             String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
                                      destination.getAddressName(): "amq.topic";
-            
+
             for (Binding binding: bindings)
             {
                 // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
@@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQ
                 }
                 String queue = binding.getQueue() == null?
                                    queueName.asString(): binding.getQueue();
-                                   
-               String exchange = binding.getExchange() == null ? 
+
+               String exchange = binding.getExchange() == null ?
                                  defaultExchange :
                                  binding.getExchange();
-                        
-                _logger.debug("Binding queue : " + queue + 
-                              " exchange: " + exchange + 
-                              " using binding key " + binding.getBindingKey() + 
+
+                _logger.debug("Binding queue : " + queue +
+                              " exchange: " + exchange +
+                              " using binding key " + binding.getBindingKey() +
                               " with args " + Strings.printMap(binding.getArgs()));
-                getQpidSession().exchangeBind(queue, 
+                getQpidSession().exchangeBind(queue,
                                               exchange,
                                               binding.getBindingKey(),
-                                              binding.getArgs()); 
+                                              binding.getArgs());
             }
         }
-        
+
         if (!nowait)
         {
             // We need to sync so that we get notify of an error.
@@ -561,20 +561,18 @@ public class AMQSession_0_10 extends AMQ
      */
 
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-    throws JMSException
     {
         return isQueueBound(exchangeName,queueName,routingKey,null);
     }
 
-    public boolean isQueueBound(final AMQDestination destination) throws JMSException
+    public boolean isQueueBound(final AMQDestination destination)
     {
         return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
     }
 
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
-    throws JMSException
     {
-        String rk = null;        
+        String rk = null;
         if (bindingKeys != null && bindingKeys.length>0)
         {
             rk = bindingKeys[0].toString();
@@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQ
         {
             rk = routingKey.toString();
         }
-                
+
         return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
     }
-    
+
     public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
     {
         boolean res;
@@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQ
             res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
         }
         else
-        {   
+        {
             if (args == null)
             {
-                res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+                res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
                         .getQueueNotMatched());
             }
             else
             {
-                res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+                res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
                         .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
             }
         }
         return res;
     }
 
+    @Override
+    protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
+    {
+        return isQueueBound(exchangeName, amqQueueName, routingKey);
+    }
+
     /**
      * This method is invoked when a consumer is created
      * Registers the consumer with the broker
@@ -730,7 +734,7 @@ public class AMQSession_0_10 extends AMQ
     }
 
     /**
-     * deletes an exchange 
+     * deletes an exchange
      */
     public void sendExchangeDelete(final String name, final boolean nowait)
                 throws AMQException, FailoverException
@@ -763,12 +767,12 @@ public class AMQSession_0_10 extends AMQ
         }
 
         if (amqd.getDestSyntax() == DestSyntax.BURL)
-        {        
+        {
             Map<String,Object> arguments = new HashMap<String,Object>();
             if (noLocal)
-            {            
+            {
                 arguments.put(AddressHelper.NO_LOCAL, true);
-            } 
+            }
 
             getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
                                           amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
@@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQ
                     arguments,
                     node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                     node.isDurable() ? Option.DURABLE : Option.NONE,
-                    node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
+                    node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
 
         // passive --> false
@@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQ
                 try
                 {
                     long capacity = consumer.getCapacity();
-                    
+
                     if (capacity == 0)
                     {
                         if (consumer.getMessageListener() != null)
@@ -1090,20 +1094,20 @@ public class AMQSession_0_10 extends AMQ
     {
         return AMQMessageDelegateFactory.FACTORY_0_10;
     }
-    
+
     public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
     {
         boolean match = true;
         ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
-        match = !result.getNotFound();        
+        match = !result.getNotFound();
         Node node = dest.getNode();
-        
+
         if (match)
         {
             if (assertNode)
             {
-                match =  (result.getDurable() == node.isDurable()) && 
-                         (node.getExchangeType() != null && 
+                match =  (result.getDurable() == node.isDurable()) &&
+                         (node.getExchangeType() != null &&
                           node.getExchangeType().equals(result.getType())) &&
                          (matchProps(result.getArguments(),node.getDeclareArgs()));
             }
@@ -1125,7 +1129,7 @@ public class AMQSession_0_10 extends AMQ
 
         return match;
     }
-    
+
     public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
     {
         boolean match = true;
@@ -1137,7 +1141,7 @@ public class AMQSession_0_10 extends AMQ
 
             if (match && assertNode)
             {
-                match = (result.getDurable() == node.isDurable()) && 
+                match = (result.getDurable() == node.isDurable()) &&
                          (result.getAutoDelete() == node.isAutoDelete()) &&
                          (result.getExclusive() == node.isExclusive()) &&
                          (matchProps(result.getArguments(),node.getDeclareArgs()));
@@ -1165,17 +1169,17 @@ public class AMQSession_0_10 extends AMQ
         }
         return match;
     }
-    
+
     private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
     {
         boolean match = true;
         for (String key: source.keySet())
         {
-            match = target.containsKey(key) && 
+            match = target.containsKey(key) &&
                     target.get(key).equals(source.get(key));
-            
-            if (!match) 
-            { 
+
+            if (!match)
+            {
                 StringBuffer buf = new StringBuffer();
                 buf.append("Property given in address did not match with the args sent by the broker.");
                 buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
@@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQ
                 return match;
             }
         }
-        
+
         return match;
     }
 
     /**
      * 1. Try to resolve the address type (queue or exchange)
-     * 2. if type == queue, 
+     * 2. if type == queue,
      *       2.1 verify queue exists or create if create == true
      *       2.2 If not throw exception
-     *       
+     *
      * 3. if type == exchange,
      *       3.1 verify exchange exists or create if create == true
      *       3.2 if not throw exception
      *       3.3 if exchange exists (or created) create subscription queue.
      */
-    
+
     @SuppressWarnings("deprecation")
     public void resolveAddress(AMQDestination dest,
                                               boolean isConsumer,
@@ -1211,21 +1215,21 @@ public class AMQSession_0_10 extends AMQ
         }
         else
         {
-            boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || 
+            boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
                                  (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
                                  (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-            
+
             boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
                                  (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
                                  (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-                        
-            
-            
+
+
+
             int type = resolveAddressType(dest);
-            
+
             switch (type)
             {
-                case AMQDestination.QUEUE_TYPE: 
+                case AMQDestination.QUEUE_TYPE:
                 {
                     if(createNode)
                     {
@@ -1239,24 +1243,24 @@ public class AMQSession_0_10 extends AMQ
                         break;
                     }
                 }
-                
-                case AMQDestination.TOPIC_TYPE: 
+
+                case AMQDestination.TOPIC_TYPE:
                 {
                     if(createNode)
-                    {                    
+                    {
                         setLegacyFiledsForTopicType(dest);
                         verifySubject(dest);
                         handleExchangeNodeCreation(dest);
                         break;
                     }
                     else if (isExchangeExist(dest,assertNode))
-                    {                    
+                    {
                         setLegacyFiledsForTopicType(dest);
                         verifySubject(dest);
                         break;
                     }
                 }
-                
+
                 default:
                     throw new AMQException(
                             "The name '" + dest.getAddressName() +
@@ -1265,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
             dest.setAddressResolved(System.currentTimeMillis());
         }
     }
-    
+
     public int resolveAddressType(AMQDestination dest) throws AMQException
     {
        int type = dest.getAddressType();
@@ -1292,14 +1296,14 @@ public class AMQSession_0_10 extends AMQ
             }
             dest.setAddressType(type);
             return type;
-        }        
+        }
     }
-    
+
     private void verifySubject(AMQDestination dest) throws AMQException
     {
         if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
         {
-            
+
             if ("topic".equals(dest.getExchangeClass().toString()))
             {
                 dest.setRoutingKey(new AMQShortString("#"));
@@ -1364,12 +1368,12 @@ public class AMQSession_0_10 extends AMQ
         // legacy support
         dest.setExchangeName(new AMQShortString(dest.getAddressName()));
         Node node = dest.getNode();
-        dest.setExchangeClass(node.getExchangeType() == null? 
+        dest.setExchangeClass(node.getExchangeType() == null?
                               ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
-                              new AMQShortString(node.getExchangeType()));  
+                              new AMQShortString(node.getExchangeType()));
         dest.setRoutingKey(new AMQShortString(dest.getSubject()));
     }
-    
+
     protected void acknowledgeImpl()
     {
         RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1412,7 +1416,7 @@ public class AMQSession_0_10 extends AMQ
             List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
             getPrefetchedMessageTags().addAll(tags);
         }
-        
+
         RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
 		RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
 		RangeSet all = RangeSetFactory.createRangeSet(delivered.size()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sat Jun  1 19:24:36 2013
@@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQS
         // thread.
         // We can't close the session if we are already in the process of
         // closing/closed the connection.
-                
+
         if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
             || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
         {
@@ -381,10 +381,7 @@ public class AMQSession_0_8 extends AMQS
                     {
                         public AMQMethodEvent execute() throws AMQException, FailoverException
                         {
-                            AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
-                                                    (exchangeName, routingKey, queueName).generateFrame(getChannelId());
-
-                            return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+                            return sendExchangeBound(exchangeName, routingKey, queueName);
 
                         }
                     }, getAMQConnection()).execute();
@@ -398,7 +395,38 @@ public class AMQSession_0_8 extends AMQS
         {
             throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
         }
-    }    
+    }
+
+    @Override
+    protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+            throws AMQException
+    {
+
+        AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+                new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+                {
+                    public AMQMethodEvent execute() throws AMQException, FailoverException
+                    {
+                        return sendExchangeBound(exchangeName, routingKey, queueName);
+
+                    }
+                }, getAMQConnection()).execute();
+
+        // Extract and return the response code from the query.
+        ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+        return (responseBody.getReplyCode() == 0);
+    }
+
+    private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
+                                             AMQShortString routingKey,
+                                             AMQShortString queueName) throws AMQException, FailoverException
+    {
+        AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
+                                (exchangeName, routingKey, queueName).generateFrame(getChannelId());
+
+        return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+    }
 
     @Override
     public void sendConsume(BasicMessageConsumer_0_8 consumer,
@@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQS
            JMSException ex = new JMSException("Error creating producer");
            ex.initCause(e);
            ex.setLinkedException(e);
-           
+
            throw ex;
        }
     }
@@ -609,7 +637,7 @@ public class AMQSession_0_8 extends AMQS
                         // todo send low water mark when protocol allows.
                         // todo Be aware of possible changes to parameter order as versions change.
                         getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
-                  
+
                         return null;
                     }
                  }, getAMQConnection()).execute();
@@ -671,7 +699,7 @@ public class AMQSession_0_8 extends AMQS
                                                        false,
                                                        null).generateFrame(getChannelId());
         QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
-        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);        
+        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
         return okHandler.getMessageCount();
     }
 
@@ -689,9 +717,9 @@ public class AMQSession_0_8 extends AMQS
     {
         return AMQMessageDelegateFactory.FACTORY_0_8;
     }
-    
+
     public void sync() throws AMQException
-    {    
+    {
         declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
     }
 
@@ -702,10 +730,10 @@ public class AMQSession_0_8 extends AMQS
         throw new UnsupportedOperationException("The new addressing based syntax is "
                 + "not supported for AMQP 0-8/0-9 versions");
     }
-    
+
     protected void flushAcknowledgments()
     {
-        
+
     }
 
     @Override
@@ -744,7 +772,7 @@ public class AMQSession_0_8 extends AMQS
         // if the Connection has closed then we should throw any exception that
         // has occurred that we were not waiting for
         AMQStateManager manager = getProtocolHandler().getStateManager();
-        
+
         Exception e = manager.getLastException();
         if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
                 && e != null)
@@ -752,15 +780,15 @@ public class AMQSession_0_8 extends AMQS
             if (e instanceof AMQException)
             {
                 return (AMQException) e;
-            } 
+            }
             else
             {
                 AMQException amqe = new AMQException(AMQConstant
-                        .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), 
+                        .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
                         e.getMessage(), e.getCause());
                 return amqe;
             }
-        } 
+        }
         else
         {
             return null;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Sat Jun  1 19:24:36 2013
@@ -42,7 +42,7 @@ public class AMQTopic extends AMQDestina
     {
         super(address);
     }
-    
+
     protected AMQTopic()
     {
         super();
@@ -89,6 +89,12 @@ public class AMQTopic extends AMQDestina
         super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable);
     }
 
+
+    protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+    {
+        super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable);
+    }
+
     protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
                                boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
     {
@@ -114,10 +120,10 @@ public class AMQTopic extends AMQDestina
                     AMQTopic t = new AMQTopic(qpidTopic.getAddress());
                     AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
                     // link is never null if dest was created using an address string.
-                    t.getLink().setName(queueName.asString());               
+                    t.getLink().setName(queueName.asString());
                     t.getLink().getSubscriptionQueue().setAutoDelete(false);
                     t.getLink().setDurable(true);
-                    
+
                     // The legacy fields are also populated just in case.
                     t.setQueueName(queueName);
                     t.setAutoDelete(false);
@@ -134,7 +140,7 @@ public class AMQTopic extends AMQDestina
             }
             else
             {
-                return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+                return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getExchangeClass(), qpidTopic.getRoutingKey(), false,
                                 getDurableTopicQueueName(subscriptionName, connection),
                                 true);
             }
@@ -165,7 +171,7 @@ public class AMQTopic extends AMQDestina
             return null;
         }
     }
-    
+
     @Override
     public AMQShortString getExchangeName()
     {
@@ -181,9 +187,9 @@ public class AMQTopic extends AMQDestina
 
     public AMQShortString getRoutingKey()
     {
-        if (super.getRoutingKey() != null)            
+        if (super.getRoutingKey() != null)
         {
-            return super.getRoutingKey();            
+            return super.getRoutingKey();
         }
         else if (getSubject() != null)
         {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java Sat Jun  1 19:24:36 2013
@@ -40,7 +40,6 @@ public enum AMQPFilterTypes
     /** The identifying string for the filter type. */
     private final AMQShortString _value;
 
-
     /**
      * Creates a new filter type from its identifying string.
      *
@@ -60,4 +59,10 @@ public enum AMQPFilterTypes
     {
         return _value;
     }
+
+    @Override
+    public String toString()
+    {
+        return _value.asString();
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Sat Jun  1 19:24:36 2013
@@ -83,9 +83,12 @@ public class ReturnUnroutableMandatoryMe
             AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
             queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
+
             FieldTable ft = new FieldTable();
             ft.setString("F1000", "1");
-            consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 ,  false, false, (String) null, ft);
+            consumerSession.declareAndBind(queue, ft);
+
+            consumer = consumerSession.createConsumer(queue);
 
             //force synch to ensure the consumer has resulted in a bound queue
             //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java?rev=1488561&r1=1488560&r2=1488561&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java Sat Jun  1 19:24:36 2013
@@ -111,7 +111,7 @@ public class BindingLoggingTest extends 
         String messageID = "BND-1001";
         String queueName = _queue.getQueueName();
         String exchange = "direct/amq.direct";
-        String message = "Create : Arguments : {x-filter-jms-selector=}";
+        String message = "Create";
         validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName);
     }
 
@@ -145,7 +145,7 @@ public class BindingLoggingTest extends 
 
         // Perform full testing on the binding
         String message = getMessageString(fromMessage(getLogMessage(results, 0)));
-        
+
         validateLogMessage(getLogMessage(results, 0), messageID, message,
                 "topic/amq.topic", "topic", "clientid:" + getName());
 
@@ -208,17 +208,17 @@ public class BindingLoggingTest extends 
         validateMessageID(messageID, log);
 
         String subject = fromSubject(log);
-        
+
         validateBindingDeleteArguments(subject, "/test");
 
         assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log)));
 
     }
-    
+
     private void validateBindingDeleteArguments(String subject, String vhostName)
     {
         String routingKey = AbstractTestLogSubject.getSlice("rk", subject);
-        
+
         assertTrue("Routing Key does not start with TempQueue:"+routingKey,
                 routingKey.startsWith("TempQueue"));
         assertEquals("Virtualhost not correct.", vhostName,



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org