You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/17 17:32:19 UTC

svn commit: r538968 - in /incubator/qpid/trunk/qpid: ./ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/broker/src/test/java/org/a...

Author: rupertlssmith
Date: Thu May 17 08:32:18 2007
New Revision: 538968

URL: http://svn.apache.org/viewvc?view=rev&rev=538968
Log:
Merged revisions 538084-538097,538099-538108,538110-538906,538908-538912 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r538084 | ritchiem | 2007-05-15 09:02:42 +0100 (Tue, 15 May 2007) | 1 line
  
  QPID-466 Removed Unsupported exception from setIntProperty with STRICT_AMQP set
........
  r538240 | ritchiem | 2007-05-15 17:19:01 +0100 (Tue, 15 May 2007) | 6 lines
  
  QPID-3 Topic Matching with tests
  
  A simple naive approach. Similar to C++ to be included for M2.
  
  More elaborate pre-evaluated version will have to wait.
  Once benchmarks have been performed we can evaluate performance advantages if any of that approach.
........
  r538882 | ritchiem | 2007-05-17 13:12:34 +0100 (Thu, 17 May 2007) | 3 lines
  
  Fix for broken CSDM message purging routine that was causing python test_get to fail.
  
  Replaced long while control with a method call that is easier to understand and has more comments.
........
  r538912 | ritchiem | 2007-05-17 14:26:25 +0100 (Thu, 17 May 2007) | 2 lines
  
  Fixed failing python tests. The rather annoying way we unsubscribe subscribers by creating new ones was causing a problem as the closing channel had been closed before the unsubscribe call.
  Java now passes all python tests 
........

Added:
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
      - copied, changed from r538882, incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu May 17 08:32:18 2007
@@ -31,6 +31,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
@@ -42,12 +43,12 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.Subscription;
-import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.*;
 
@@ -59,7 +60,7 @@
 
     private final int _channelId;
 
-    //private boolean _transactional;
+    // private boolean _transactional;
 
     private long _prefetch_HighWaterMark;
 
@@ -119,14 +120,12 @@
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
-    //Why do we need this reference ? - ritchiem
+    // Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;
     private boolean _closing;
 
-
-    public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
-            throws
-            AMQException
+    public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager,
+        MessageStore messageStore, MessageRouter exchanges) throws AMQException
     {
         _session = session;
         _channelId = channelId;
@@ -145,7 +144,8 @@
      */
     public void setLocalTransactional()
     {
-        _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
+        _txnContext =
+            new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
     }
 
     public boolean isTransactional()
@@ -176,7 +176,6 @@
         return _prefetchSize;
     }
 
-
     public void setPrefetchSize(long prefetchSize)
     {
         _prefetchSize = prefetchSize;
@@ -202,31 +201,26 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
-
-    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher)
-            throws
-            AMQException
+    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
     {
 
-
-        _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
-                _txnContext);
+        _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
         _currentMessage.setPublisher(publisher);
     }
 
-    public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-            throws
-            AMQException
+    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
     {
         if (_currentMessage == null)
         {
             throw new AMQException("Received content header without previously receiving a BasicPublish frame");
-        } else
+        }
+        else
         {
             if (_log.isTraceEnabled())
             {
                 _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
             }
+
             _currentMessage.setContentHeaderBody(contentHeaderBody);
             _currentMessage.setExpiration();
 
@@ -241,9 +235,7 @@
         }
     }
 
-    public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
-            throws
-            AMQException
+    public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
     {
         if (_currentMessage == null)
         {
@@ -254,12 +246,15 @@
         {
             _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
         }
+
         try
         {
 
             // returns true iff the message was delivered (i.e. if all data was
             // received
-            if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
+            if (_currentMessage.addContentBodyFrame(_storeContext,
+                        protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+                            contentBody)))
             {
                 // callback to allow the context to do any post message processing
                 // primary use is to allow message return processing in the non-tx case
@@ -276,9 +271,7 @@
         }
     }
 
-    protected void routeCurrentMessage()
-            throws
-            AMQException
+    protected void routeCurrentMessage() throws AMQException
     {
         try
         {
@@ -316,15 +309,13 @@
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
-                                           FieldTable filters, boolean noLocal, boolean exclusive)
-            throws
-            AMQException,
-            ConsumerTagNotUniqueException
+        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
             tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
+
         if (_consumerTag2QueueMap.containsKey(tag))
         {
             throw new ConsumerTagNotUniqueException();
@@ -332,13 +323,11 @@
 
         queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
         _consumerTag2QueueMap.put(tag, queue);
+
         return tag;
     }
 
-
-    public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag)
-            throws
-            AMQException
+    public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag) throws AMQException
     {
         final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
@@ -353,38 +342,44 @@
      * @param session The session to close
      * @throws AMQException if there is an error during closure
      */
-    public void close(AMQProtocolSession session)
-            throws
-            AMQException
+    public void close(AMQProtocolSession session) throws AMQException
     {
-        _closing = true;
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
+
+        setClosing(true);
+    }
+
+    private void setClosing(boolean closing)
+    {
+        _closing = closing;
     }
 
-    private void unsubscribeAllConsumers(AMQProtocolSession session)
-            throws
-            AMQException
+    private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
     {
         if (_log.isInfoEnabled())
         {
             if (!_consumerTag2QueueMap.isEmpty())
             {
                 _log.info("Unsubscribing all consumers on channel " + toString());
-            } else
+            }
+            else
             {
                 _log.info("No consumers to unsubscribe on channel " + toString());
             }
         }
+
         for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
         {
             if (_log.isInfoEnabled())
             {
                 _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
+
             me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
         }
+
         _consumerTag2QueueMap.clear();
     }
 
@@ -404,12 +399,13 @@
             if (queue == null)
             {
                 _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
-            } else
+            }
+            else
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
-                            ") with a queue(" + queue + ") for " + consumerTag);
+                    _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
+                        + ") with a queue(" + queue + ") for " + consumerTag);
                 }
             }
         }
@@ -434,9 +430,7 @@
      *
      * @throws org.apache.qpid.AMQException if the requeue fails
      */
-    public void requeue()
-            throws
-            AMQException
+    public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -454,20 +448,20 @@
 
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-//                if (_nonTransactedContext == null)
+                // if (_nonTransactedContext == null)
                 {
-                    _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                            _returnMessages, _browsedAcks);
+                    _nonTransactedContext =
+                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
-            } else
+            }
+            else
             {
                 deliveryContext = _txnContext;
             }
         }
 
-
         for (UnacknowledgedMessage unacked : messagesToBeDelivered)
         {
             if (unacked.queue != null)
@@ -483,7 +477,7 @@
 
                 // Should we allow access To the DM to directy deliver the message?
                 // As we don't need to check for Consumers or worry about incrementing the message count?
-//                unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+                // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
             }
         }
 
@@ -495,9 +489,7 @@
      * @param deliveryTag The message to requeue
      * @throws AMQException If something goes wrong.
      */
-    public void requeue(long deliveryTag)
-            throws
-            AMQException
+    public void requeue(long deliveryTag) throws AMQException
     {
         UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
@@ -518,74 +510,71 @@
             TransactionalContext deliveryContext;
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-//                if (_nonTransactedContext == null)
+                // if (_nonTransactedContext == null)
                 {
-                    _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                            _returnMessages, _browsedAcks);
+                    _nonTransactedContext =
+                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
-            } else
+            }
+            else
             {
                 deliveryContext = _txnContext;
             }
 
             if (unacked.queue != null)
             {
-                //Redeliver the messages to the front of the queue
+                // Redeliver the messages to the front of the queue
                 deliveryContext.deliver(unacked.message, unacked.queue, true);
-                //Deliver increments the message count but we have already deliverted this once so don't increment it again
+                // Deliver increments the message count but we have already deliverted this once so don't increment it again
                 // this was because deliver did an increment changed this.
-            } else
+            }
+            else
             {
-                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
-                        " but no queue defined and no DeadLetter queue so DROPPING message.");
-//                _log.error("Requested requeue of message:" + deliveryTag +
-//                           " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
-//
-//                deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
-//
+                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+                    + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+                // _log.error("Requested requeue of message:" + deliveryTag +
+                // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+                //
+                // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+                //
             }
-        } else
+        }
+        else
         {
-            _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+            _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+                + _unacknowledgedMessageMap.size());
 
             if (_log.isDebugEnabled())
             {
                 _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-                {
-                    int count = 0;
-
-                    public boolean callback(UnacknowledgedMessage message)
-                            throws
-                            AMQException
                     {
-                        _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
-                                "[" + message.deliveryTag + "]");
-                        return false;  // Continue
-                    }
+                        int count = 0;
 
-                    public void visitComplete()
-                    {
+                        public boolean callback(UnacknowledgedMessage message) throws AMQException
+                        {
+                            _log.debug(
+                                (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
 
-                    }
-                });
+                            return false; // Continue
+                        }
+
+                        public void visitComplete()
+                        { }
+                    });
             }
         }
 
-
     }
 
-
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
      * @param requeue Are the messages to be requeued or dropped.
      * @throws AMQException When something goes wrong.
      */
-    public void resend(final boolean requeue)
-            throws
-            AMQException
+    public void resend(final boolean requeue) throws AMQException
     {
         final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
         final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
@@ -599,52 +588,53 @@
         // Marking messages who still have a consumer for to be resent
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-        {
-            public boolean callback(UnacknowledgedMessage message)
-                    throws
-                    AMQException
-            {
-                AMQShortString consumerTag = message.consumerTag;
-                AMQMessage msg = message.message;
-                msg.setRedelivered(true);
-                if (consumerTag != null)
+            {
+                public boolean callback(UnacknowledgedMessage message) throws AMQException
                 {
-                    // Consumer exists
-                    if (_consumerTag2QueueMap.containsKey(consumerTag))
+                    AMQShortString consumerTag = message.consumerTag;
+                    AMQMessage msg = message.message;
+                    msg.setRedelivered(true);
+                    if (consumerTag != null)
                     {
-                        msgToResend.add(message);
-                    } else // consumer has gone
-                    {
-                        msgToRequeue.add(message);
+                        // Consumer exists
+                        if (_consumerTag2QueueMap.containsKey(consumerTag))
+                        {
+                            msgToResend.add(message);
+                        }
+                        else // consumer has gone
+                        {
+                            msgToRequeue.add(message);
+                        }
                     }
-                } else
-                {
-                    // Message has no consumer tag, so was "delivered" to a GET
-                    // or consumer no longer registered
-                    // cannot resend, so re-queue.
-                    if (message.queue != null)
+                    else
                     {
-                        if (requeue)
+                        // Message has no consumer tag, so was "delivered" to a GET
+                        // or consumer no longer registered
+                        // cannot resend, so re-queue.
+                        if (message.queue != null)
                         {
-                            msgToRequeue.add(message);
-                        } else
+                            if (requeue)
+                            {
+                                msgToRequeue.add(message);
+                            }
+                            else
+                            {
+                                _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+                            }
+                        }
+                        else
                         {
-                            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+                            _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
                         }
-                    } else
-                    {
-                        _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
                     }
-                }
 
-                // false means continue processing
-                return false;
-            }
+                    // false means continue processing
+                    return false;
+                }
 
-            public void visitComplete()
-            {
-            }
-        });
+                public void visitComplete()
+                { }
+            });
 
         // Process Messages to Resend
         if (_log.isInfoEnabled())
@@ -652,11 +642,13 @@
             if (!msgToResend.isEmpty())
             {
                 _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
-            } else
+            }
+            else
             {
                 _log.info("No message to resend.");
             }
         }
+
         for (UnacknowledgedMessage message : msgToResend)
         {
             AMQMessage msg = message.message;
@@ -665,22 +657,21 @@
             // If the client has requested the messages be resent then it is
             // their responsibility to ensure that thay are capable of receiving them
             // i.e. The channel hasn't been server side suspended.
-//            if (isSuspended())
-//            {
-//                _log.info("Channel is suspended so requeuing");
-//                //move this message to requeue
-//                msgToRequeue.add(message);
-//            }
-//            else
-//            {
-            //release to allow it to be delivered
+            // if (isSuspended())
+            // {
+            // _log.info("Channel is suspended so requeuing");
+            // //move this message to requeue
+            // msgToRequeue.add(message);
+            // }
+            // else
+            // {
+            // release to allow it to be delivered
             msg.release(message.queue);
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             msg.setRedelivered(true);
 
-
             Subscription sub = msg.getDeliveredSubscription(message.queue);
 
             if (sub != null)
@@ -697,32 +688,38 @@
                     {
                         if (_log.isDebugEnabled())
                         {
-                            _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
+                            _log.debug("Subscription(" + System.identityHashCode(sub)
+                                + ") closed during resend so requeuing message");
                         }
-                        //move this message to requeue
+                        // move this message to requeue
                         msgToRequeue.add(message);
-                    } else
+                    }
+                    else
                     {
                         if (_log.isDebugEnabled())
                         {
-                            _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
+                            _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
+                                + System.identityHashCode(sub));
                         }
+
                         sub.addToResendQueue(msg);
                         _unacknowledgedMessageMap.remove(message.deliveryTag);
                     }
                 } // sync(sub.getSendLock)
-            } else
+            }
+            else
             {
 
                 if (_log.isInfoEnabled())
                 {
-                    _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+                    _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+                        + ")to prevent loss");
                 }
-                //move this message to requeue
+                // move this message to requeue
                 msgToRequeue.add(message);
             }
         } // for all messages
-//        } else !isSuspend
+        // } else !isSuspend
 
         if (_log.isInfoEnabled())
         {
@@ -739,12 +736,13 @@
         {
             if (_nonTransactedContext == null)
             {
-                _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                        _returnMessages, _browsedAcks);
+                _nonTransactedContext =
+                    new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
             }
 
             deliveryContext = _nonTransactedContext;
-        } else
+        }
+        else
         {
             deliveryContext = _txnContext;
         }
@@ -769,36 +767,32 @@
      * @param queue the queue that has been deleted
      * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
      */
-    public void queueDeleted(final AMQQueue queue)
-            throws
-            AMQException
+    public void queueDeleted(final AMQQueue queue) throws AMQException
     {
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-        {
-            public boolean callback(UnacknowledgedMessage message)
-                    throws
-                    AMQException
             {
-                if (message.queue == queue)
+                public boolean callback(UnacknowledgedMessage message) throws AMQException
                 {
-                    try
-                    {
-                        message.discard(_storeContext);
-                        message.queue = null;
-                    }
-                    catch (AMQException e)
+                    if (message.queue == queue)
                     {
-                        _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
-                                e, e);
+                        try
+                        {
+                            message.discard(_storeContext);
+                            message.queue = null;
+                        }
+                        catch (AMQException e)
+                        {
+                            _log.error(
+                                "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+                        }
                     }
+
+                    return false;
                 }
-                return false;
-            }
 
-            public void visitComplete()
-            {
-            }
-        });
+                public void visitComplete()
+                { }
+            });
     }
 
     /**
@@ -809,9 +803,7 @@
      *                    acknowledges the single message specified by the delivery tag
      * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple)
-            throws
-            AMQException
+    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
         synchronized (_unacknowledgedMessageMap.getLock())
         {
@@ -828,6 +820,7 @@
             }
 
         }
+
         checkSuspension();
     }
 
@@ -845,8 +838,9 @@
     {
         boolean suspend;
 
-        suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
-                || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+        suspend =
+            ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+            || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
 
         setSuspended(suspend);
     }
@@ -867,12 +861,13 @@
             if (wasSuspended)
             {
                 _log.debug("Unsuspending channel " + this);
-                //may need to deliver queued messages
+                // may need to deliver queued messages
                 for (AMQQueue q : _consumerTag2QueueMap.values())
                 {
                     q.deliverAsync();
                 }
-            } else
+            }
+            else
             {
                 _log.debug("Suspending channel " + this);
             }
@@ -884,20 +879,17 @@
         return _suspended.get();
     }
 
-    public void commit()
-            throws
-            AMQException
+    public void commit() throws AMQException
     {
         if (!isTransactional())
         {
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
-        _txnContext.commit();       
+
+        _txnContext.commit();
     }
 
-    public void rollback()
-            throws
-            AMQException
+    public void rollback() throws AMQException
     {
         _txnContext.rollback();
     }
@@ -908,6 +900,7 @@
         sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
         sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
         sb.append("/").append(_prefetch_HighWaterMark);
+
         return sb.toString();
     }
 
@@ -926,41 +919,40 @@
         return _storeContext;
     }
 
-    public void processReturns(AMQProtocolSession session)
-            throws
-            AMQException
+    public void processReturns(AMQProtocolSession session) throws AMQException
     {
         for (RequiredDeliveryException bouncedMessage : _returnMessages)
         {
             AMQMessage message = bouncedMessage.getAMQMessage();
-            session.getProtocolOutputConverter().writeReturn(message, _channelId,
-                    bouncedMessage.getReplyCode().getCode(),
-                    new AMQShortString(bouncedMessage.getMessage()));
+            session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+                new AMQShortString(bouncedMessage.getMessage()));
         }
+
         _returnMessages.clear();
     }
 
-
     public boolean wouldSuspend(AMQMessage msg)
     {
         if (isSuspended())
         {
             return true;
-        } else
+        }
+        else
         {
-            boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+            boolean willSuspend =
+                ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
             if (!willSuspend)
             {
                 final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
 
-                willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+                willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
             }
 
-
             if (willSuspend)
             {
                 setSuspended(true);
             }
+
             return willSuspend;
         }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Thu May 17 08:32:18 2007
@@ -23,6 +23,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.LinkedList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -56,6 +58,10 @@
     private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
 
     private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    //    private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+    private static final String TOPIC_SEPARATOR = ".";
+    private static final String AMQP_STAR = "*";
+    private static final String AMQP_HASH = "#";
 
     /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
     @MBeanDescription("Management Bean for Topic Exchange")
@@ -78,7 +84,7 @@
                 AMQShortString key = entry.getKey();
                 List<String> queueList = new ArrayList<String>();
 
-                List<AMQQueue> queues = entry.getValue();
+                List<AMQQueue> queues = getMatchedQueues(key);
                 for (AMQQueue q : queues)
                 {
                     queueList.add(q.getName().toString());
@@ -118,10 +124,13 @@
         return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
     }
 
-    public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
-        assert routingKey != null;
+        assert rKey != null;
+
+        AMQShortString routingKey = normalize(rKey);
+
         _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
         // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
         List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
@@ -142,15 +151,67 @@
 
     }
 
+    private AMQShortString normalize(AMQShortString routingKey)
+    {
+        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+        List<String> _subscription = new ArrayList<String>();
+
+        while (routingTokens.hasMoreTokens())
+        {
+            _subscription.add(routingTokens.nextToken());
+        }
+
+        int size = _subscription.size();
+
+        for (int index = 0; index < size; index++)
+        {
+            //if there are more levels
+            if (index + 1 < size)
+            {
+                if (_subscription.get(index).equals(AMQP_HASH))
+                {
+                    if (_subscription.get(index + 1).equals(AMQP_HASH))
+                    {
+                        // we don't need #.# delete this one
+                        _subscription.remove(index);
+                        size--;
+                        //redo this normalisation
+                        index--;
+                    }
+
+                    if (_subscription.get(index + 1).equals(AMQP_STAR))
+                    {
+                        // we don't want #.* swap to *.#
+                        // remove it and put it in at index + 1
+                        _subscription.add(index + 1, _subscription.remove(index));
+                    }
+                }
+            }//if we have more levels
+        }
+
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : _subscription)
+        {
+            sb.append(s);
+            sb.append(TOPIC_SEPARATOR);
+        }
+
+        sb.deleteCharAt(sb.length() - 1);
+
+        return new AMQShortString(sb.toString());
+    }
+
     public void route(AMQMessage payload) throws AMQException
     {
         MessagePublishInfo info = payload.getMessagePublishInfo();
 
-        final AMQShortString routingKey = info.getRoutingKey();
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        final AMQShortString routingKey = normalize(info.getRoutingKey());
+
+        List<AMQQueue> queues = getMatchedQueues(routingKey);
         // if we have no registered queues we have nothing to do
         // TODO: add support for the immediate flag
-        if (queues == null)
+        if (queues == null || queues.size() == 0)
         {
             if (info.isMandatory())
             {
@@ -177,14 +238,14 @@
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
     {
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
         return queues != null && queues.contains(queue);
     }
 
 
     public boolean isBound(AMQShortString routingKey) throws AMQException
     {
-        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
         return queues != null && !queues.isEmpty();
     }
 
@@ -205,10 +266,12 @@
         return !_routingKey2queues.isEmpty();
     }
 
-    public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
-        assert routingKey != null;
+        assert rKey != null;
+
+        AMQShortString routingKey = normalize(rKey);
 
         List<AMQQueue> queues = _routingKey2queues.get(routingKey);
         if (queues == null)
@@ -240,5 +303,111 @@
             _logger.error("Exception occured in creating the topic exchenge mbean", ex);
             throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
         }
+    }
+
+
+    private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
+    {
+        List<AMQQueue> list = new LinkedList<AMQQueue>();
+        StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+
+        ArrayList<String> routingkeyList = new ArrayList<String>();
+
+        while (routingTokens.hasMoreTokens())
+        {
+            String next = routingTokens.nextToken();
+            if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
+            {
+                continue;
+            }
+
+            routingkeyList.add(next);
+        }
+
+        for (AMQShortString queue : _routingKey2queues.keySet())
+        {
+            StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+
+            ArrayList<String> queueList = new ArrayList<String>();
+
+            while (queTok.hasMoreTokens())
+            {
+                queueList.add(queTok.nextToken());
+            }
+
+
+            int depth = 0;
+            boolean matching = true;
+            boolean done = false;
+            int routingskip = 0;
+            int queueskip = 0;
+
+            while (matching && !done)
+            {
+                if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+                {
+                    done = true;
+
+                    // if it was the routing key that ran out of digits
+                    if (routingkeyList.size() == depth + routingskip)
+                    {
+                        if (queueList.size() > (depth + queueskip))
+                        {            // a hash and it is the last entry
+                            matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+                        }
+                    }
+                    else if (routingkeyList.size() > depth + routingskip)
+                    {
+                        // There is still more routing key to check
+                        matching = false;
+                    }
+
+
+                    continue;
+                }
+
+                // if the values on the two topics don't match
+                if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+                {
+                    if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+                    {
+                        depth++;
+
+                        continue;
+                    }
+                    else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+                    {
+                        // Is this a # at the end
+                        if (queueList.size() == depth + queueskip + 1)
+                        {
+                            done = true;
+                            continue;
+                        }
+
+                        // otherwise # in the middle
+                        while (routingkeyList.size() > depth + routingskip)
+                        {
+                            if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+                            {
+                                queueskip++;
+                                depth++;
+                                break;
+                            }
+                            routingskip++;
+                        }
+                        continue;
+                    }
+                    matching = false;
+                }
+                depth++;
+            }
+
+            if (matching)
+            {
+                list.addAll(_routingKey2queues.get(queue));
+            }
+        }
+
+        return list;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu May 17 08:32:18 2007
@@ -451,13 +451,7 @@
         AMQMessage message = messages.peek();
 
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
-        while (message != null
-               && (
-                ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
-                || sub == null)
-               && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
-                   || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
-                )
+        while (purgeMessage(message, sub))
         {
             //remove the already taken message or expired
             AMQMessage removed = messages.poll();
@@ -476,6 +470,54 @@
         }
 
         return message;
+    }
+
+    /**
+     * 
+     * @param message
+     * @param sub
+     * @return
+     * @throws AMQException
+     */
+    private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+    {
+        //Original.. complicated while loop control
+//                (message != null
+//                            && (
+//                ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+//                || sub == null)
+//                            && message.taken(_queue, sub));
+
+        boolean purge = false;
+
+        // if the message is null then don't purge as we have no messagse.
+        if (message != null)
+        {
+            // if we have a subscriber perform message checks
+            if (sub != null)
+            {
+                // Check that the message hasn't expired.
+                if (message.expired(sub.getChannel().getStoreContext(), _queue))
+                {
+                    return true;
+                }
+
+                // if we have a queue browser(we don't purge) so check mark the message as taken
+                purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+            }
+            else
+            {
+                // if there is no subscription we are doing
+                // a get or purging so mark message as taken.
+                message.isTaken(_queue);
+                // and then ensure that it gets purged
+                purge = true;
+            }
+        }
+
+        // if we are purging then ensure we mark this message taken for the current subscriber
+        // the current subscriber may be null in the case of a get or a purge but this is ok.
+        return purge && message.taken(_queue, sub);
     }
 
     public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)

Copied: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (from r538882, incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?view=diff&rev=538968&p1=incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java&r1=538882&p2=incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java&r2=538968
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Thu May 17 08:32:18 2007
@@ -14,33 +14,35 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.exchange;
 
-import junit.framework.TestCase;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
 import junit.framework.Assert;
-import org.apache.qpid.server.queue.AMQQueue;
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+// import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.LinkedList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class DestWildExchangeTest extends TestCase
 {
@@ -51,22 +53,20 @@
     MessageStore _store;
     StoreContext _context;
 
-
     public void setUp() throws AMQException
     {
         _exchange = new DestWildExchange();
         _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+        // _store = new MemoryMessageStore();
         _store = new MemoryMessageStore();
         _context = new StoreContext();
     }
 
-
     public void testNoRoute() throws AMQException
     {
         AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
 
-
         MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
 
         AMQMessage message = new AMQMessage(0L, info, null);
@@ -78,7 +78,7 @@
         }
         catch (NoRouteException nre)
         {
-            //normal   
+            // normal
         }
 
         Assert.assertEquals(0, queue.getMessageCount());
@@ -89,7 +89,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
 
-
         AMQMessage message = createMessage("a.b");
 
         try
@@ -109,7 +108,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.c");
 
         try
@@ -119,19 +117,16 @@
             fail("Message has no route and should fail to be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
 
-
     public void testStarMatch() throws AMQException
     {
         AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.*"), queue, null);
 
-
         AMQMessage message = createMessage("a.b");
 
         try
@@ -151,7 +146,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.c");
 
         try
@@ -171,7 +165,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a");
 
         try
@@ -181,8 +174,7 @@
             fail("Message has no route and should fail to be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
@@ -192,7 +184,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.#"), queue, null);
 
-
         AMQMessage message = createMessage("a.b.c");
 
         try
@@ -212,7 +203,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.b");
 
         try
@@ -232,7 +222,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.c");
 
         try
@@ -271,7 +260,6 @@
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("b");
 
         try
@@ -281,19 +269,16 @@
             fail("Message has no route and should fail to be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
 
-
     public void testMidHash() throws AMQException
     {
         AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
 
-
         AMQMessage message = createMessage("a.c.d.b");
 
         try
@@ -339,7 +324,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
 
-
         AMQMessage message = createMessage("a.c.b.b");
 
         try
@@ -349,12 +333,10 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.a.b.c");
 
         try
@@ -383,8 +365,7 @@
             fail("Message has  route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -410,13 +391,11 @@
 
     }
 
-
     public void testHashAfterHash() throws AMQException
     {
         AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
 
-
         AMQMessage message = createMessage("a.c.b.b.c");
 
         try
@@ -426,12 +405,10 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
-
         message = createMessage("a.a.b.c.d");
 
         try
@@ -458,7 +435,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
 
-
         AMQMessage message = createMessage("a.c.b.b.c");
 
         try
@@ -468,8 +444,7 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -499,7 +474,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
 
-
         AMQMessage message = createMessage("a.b.c");
 
         try
@@ -509,8 +483,7 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -521,7 +494,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
 
-
         AMQMessage message = createMessage("a.b.c");
 
         try
@@ -531,8 +503,7 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -543,7 +514,6 @@
         AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
         _exchange.registerQueue(new AMQShortString("a.b"), queue, null);
 
-
         AMQMessage message = createMessage("a");
 
         try
@@ -553,8 +523,7 @@
             fail("Message has route and should not be routed");
         }
         catch (AMQException nre)
-        {
-        }
+        { }
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -564,16 +533,15 @@
     {
         MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
 
-        TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
-                                                                       new LinkedList<RequiredDeliveryException>(),
-                                                                       new HashSet<Long>());
+        TransactionalContext trancontext =
+            new NonTransactionalContext(_store, _context, null, new LinkedList<RequiredDeliveryException>(),
+                new HashSet<Long>());
 
         AMQMessage message = new AMQMessage(0L, info, trancontext);
         message.setContentHeaderBody(new ContentHeaderBody());
 
         return message;
     }
-
 
     class PublishInfo implements MessagePublishInfo
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu May 17 08:32:18 2007
@@ -467,11 +467,6 @@
 
     public void setIntProperty(String propertyName, int i) throws JMSException
     {
-        if (_strictAMQP)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
         checkWritableProperties();
         JMSHeaderAdapter.checkPropertyName(propertyName);
         super.setIntProperty(new AMQShortString(propertyName), new Integer(i));

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Thu May 17 08:32:18 2007
@@ -250,7 +250,7 @@
             ;
         }
 
-        assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+        assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
         Message m = _bouncedMessageList.get(0);
         assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));