You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Martin Ritchie <ri...@apache.org> on 2009/12/03 14:09:43 UTC

Re: svn commit: r829675 [8/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/or

Hi Rob,

Are you planning on fixing the MessageStoreMessagesTest and adding new
tests for the LogMessage types you've introduced?

Also did you notice you removed all the documentation from the
LogMessages properties file?

Cheers

Martin

2009/10/25  <rg...@apache.org>:
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Oct 25 22:58:57 2009
> @@ -25,6 +25,8 @@
>  import java.util.concurrent.atomic.AtomicLong;
>  import java.util.concurrent.locks.Lock;
>  import java.util.concurrent.locks.ReentrantLock;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.Map;
>
>  import org.apache.log4j.Logger;
>  import org.apache.qpid.AMQException;
> @@ -33,6 +35,8 @@
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.framing.FieldTable;
>  import org.apache.qpid.server.AMQChannel;
> +import org.apache.qpid.server.message.AMQMessage;
> +import org.apache.qpid.server.output.ProtocolOutputConverter;
>  import org.apache.qpid.server.logging.actors.CurrentActor;
>  import org.apache.qpid.server.logging.actors.SubscriptionActor;
>  import org.apache.qpid.server.logging.messages.SubscriptionMessages;
> @@ -45,7 +49,6 @@
>  import org.apache.qpid.server.filter.FilterManager;
>  import org.apache.qpid.server.filter.FilterManagerFactory;
>  import org.apache.qpid.server.protocol.AMQProtocolSession;
> -import org.apache.qpid.server.store.StoreContext;
>
>  /**
>  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
> @@ -65,11 +68,16 @@
>
>
>     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
> -    private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
> +    private AMQQueue.Context _queueContext;
> +
>     private final ClientDeliveryMethod _deliveryMethod;
>     private final RecordDeliveryMethod _recordMethod;
>
> -    private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
> +    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
> +    private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
> +
> +    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
> +
>     private final Lock _stateChangeLock;
>
>     private static final AtomicLong idGenerator = new AtomicLong(0);
> @@ -78,6 +86,7 @@
>     private LogSubject _logSubject;
>     private LogActor _logActor;
>
> +
>     static final class BrowserSubscription extends SubscriptionImpl
>     {
>         public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
> @@ -153,38 +162,28 @@
>         @Override
>         public void send(QueueEntry entry) throws AMQException
>         {
> +            // if we do not need to wait for client acknowledgements
> +            // we can decrement the reference count immediately.
>
> -            StoreContext storeContext = getChannel().getStoreContext();
> -            try
> -            { // if we do not need to wait for client acknowledgements
> -                // we can decrement the reference count immediately.
> -
> -                // By doing this _before_ the send we ensure that it
> -                // doesn't get sent if it can't be dequeued, preventing
> -                // duplicate delivery on recovery.
> +            // By doing this _before_ the send we ensure that it
> +            // doesn't get sent if it can't be dequeued, preventing
> +            // duplicate delivery on recovery.
> +
> +            // The send may of course still fail, in which case, as
> +            // the message is unacked, it will be lost.
> +            entry.dequeue();
>
> -                // The send may of course still fail, in which case, as
> -                // the message is unacked, it will be lost.
> -                entry.dequeue(storeContext);
>
> +            synchronized (getChannel())
> +            {
> +                long deliveryTag = getChannel().getNextDeliveryTag();
>
> -                synchronized (getChannel())
> -                {
> -                    long deliveryTag = getChannel().getNextDeliveryTag();
> -
> -                    sendToClient(entry, deliveryTag);
> +                sendToClient(entry, deliveryTag);
>
> -                }
> -                entry.dispose(storeContext);
>             }
> -            finally
> -            {
> -                //Only set delivered if it actually was writen successfully..
> -                // using a try->finally would set it even if an error occured.
> -                // Is this what we want?
> +            entry.dispose();
> +
>
> -                entry.setDeliveredToSubscription();
> -            }
>         }
>
>         @Override
> @@ -225,39 +224,30 @@
>         public void send(QueueEntry entry) throws AMQException
>         {
>
> -            try
> -            { // if we do not need to wait for client acknowledgements
> -                // we can decrement the reference count immediately.
> -
> -                // By doing this _before_ the send we ensure that it
> -                // doesn't get sent if it can't be dequeued, preventing
> -                // duplicate delivery on recovery.
> +            // if we do not need to wait for client acknowledgements
> +            // we can decrement the reference count immediately.
>
> -                // The send may of course still fail, in which case, as
> -                // the message is unacked, it will be lost.
> +            // By doing this _before_ the send we ensure that it
> +            // doesn't get sent if it can't be dequeued, preventing
> +            // duplicate delivery on recovery.
>
> -                synchronized (getChannel())
> -                {
> -                    long deliveryTag = getChannel().getNextDeliveryTag();
> +            // The send may of course still fail, in which case, as
> +            // the message is unacked, it will be lost.
>
> +            synchronized (getChannel())
> +            {
> +                long deliveryTag = getChannel().getNextDeliveryTag();
>
> -                    recordMessageDelivery(entry, deliveryTag);
> -                    sendToClient(entry, deliveryTag);
>
> +                recordMessageDelivery(entry, deliveryTag);
> +                sendToClient(entry, deliveryTag);
>
> -                }
> -            }
> -            finally
> -            {
> -                //Only set delivered if it actually was writen successfully..
> -                // using a try->finally would set it even if an error occured.
> -                // Is this what we want?
>
> -                entry.setDeliveredToSubscription();
>             }
>         }
>
>
> +
>     }
>
>
> @@ -268,7 +258,7 @@
>     private final AMQShortString _consumerTag;
>
>
> -    private final boolean _noLocal;
> +    private boolean _noLocal;
>
>     private final FlowCreditManager _creditManager;
>
> @@ -423,43 +413,35 @@
>
>     public boolean hasInterest(QueueEntry entry)
>     {
> +
> +
> +
> +
>         //check that the message hasn't been rejected
>         if (entry.isRejectedBy(this))
>         {
>             if (_logger.isDebugEnabled())
>             {
> -                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
> +                _logger.debug("Subscription:" + this + " rejected message:" + entry);
>             }
>  //            return false;
>         }
>
>         if (_noLocal)
>         {
> -            //todo - client id should be recoreded so we don't have to handle
> +
> +            AMQMessage message = (AMQMessage) entry.getMessage();
> +
> +            //todo - client id should be recorded so we don't have to handle
>             // the case where this is null.
> -            final Object publisherId = entry.getMessage().getPublisherClientInstance();
> +            final Object publisher = message.getPublisherIdentifier();
>
>             // We don't want local messages so check to see if message is one we sent
> -            Object localInstance;
> +            Object localInstance = getProtocolSession();
>
> -            if (publisherId != null && (getProtocolSession().getClientProperties() != null) &&
> -                (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
> +            if(publisher.equals(localInstance))
>             {
> -                if(publisherId.equals(localInstance))
> -                {
> -                    return false;
> -                }
> -            }
> -            else
> -            {
> -
> -                localInstance = getProtocolSession().getClientIdentifier();
> -
> -                //todo - client id should be recoreded so we don't have to do the null check
> -                if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
> -                {
> -                    return false;
> -                }
> +                return false;
>             }
>
>
> @@ -468,7 +450,7 @@
>
>         if (_logger.isDebugEnabled())
>         {
> -            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
> +            _logger.debug("(" + this + ") checking filters for message (" + entry);
>         }
>         return checkFilters(entry);
>
> @@ -483,7 +465,7 @@
>
>     private boolean checkFilters(QueueEntry msg)
>     {
> -        return (_filters == null) || _filters.allAllow(msg.getMessage());
> +        return (_filters == null) || _filters.allAllow(msg);
>     }
>
>     public boolean isAutoClose()
> @@ -550,11 +532,6 @@
>         _stateChangeLock.unlock();
>     }
>
> -    public void resend(final QueueEntry entry) throws AMQException
> -    {
> -        _queue.resend(entry, this);
> -    }
> -
>     public AMQChannel getChannel()
>     {
>         return _channel;
> @@ -585,12 +562,18 @@
>         return _queue;
>     }
>
> +    public void onDequeue(final QueueEntry queueEntry)
> +    {
> +        restoreCredit(queueEntry);
> +    }
> +
>     public void restoreCredit(final QueueEntry queueEntry)
>     {
> -        _creditManager.addCredit(1, queueEntry.getSize());
> +        _creditManager.restoreCredit(1, queueEntry.getSize());
>     }
>
>
> +
>     public void creditStateChanged(boolean hasCredit)
>     {
>
> @@ -628,22 +611,14 @@
>     }
>
>
> -    public QueueEntry getLastSeenEntry()
> +    public AMQQueue.Context getQueueContext()
>     {
> -        QueueEntry entry = _queueContext.get();
> -
> -        if(_logger.isDebugEnabled())
> -        {
> -            _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
> -        }
> -
> -        return entry;
> +        return _queueContext;
>     }
>
> -    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
> +    public void setQueueContext(AMQQueue.Context context)
>     {
> -        _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
> -        return _queueContext.compareAndSet(expected,newvalue);
> +        _queueContext = context;
>     }
>
>
> @@ -670,4 +645,43 @@
>         return _owningState;
>     }
>
> +    public QueueEntry.SubscriptionAssignedState getAssignedState()
> +    {
> +        return _assignedState;
> +    }
> +
> +
> +    public void confirmAutoClose()
> +    {
> +        ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
> +        converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
> +    }
> +
> +    public boolean acquires()
> +    {
> +        return !isBrowser();
> +    }
> +
> +    public boolean seesRequeues()
> +    {
> +        return !isBrowser();
> +    }
> +
> +    public void set(String key, Object value)
> +    {
> +        _properties.put(key, value);
> +    }
> +
> +    public Object get(String key)
> +    {
> +        return _properties.get(key);
> +    }
> +
> +
> +    public void setNoLocal(boolean noLocal)
> +    {
> +        _noLocal = noLocal;
> +    }
> +
> +    abstract boolean isBrowser();
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Sun Oct 25 22:58:57 2009
> @@ -21,7 +21,6 @@
>  package org.apache.qpid.server.virtualhost;
>
>  import org.apache.qpid.server.registry.ApplicationRegistry;
> -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
>
>  import java.util.ArrayList;
>  import java.util.Collection;
> @@ -31,7 +30,7 @@
>
>  public class VirtualHostRegistry
>  {
> -    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
> +    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
>
>
>     private String _defaultVirtualHostName;
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Sun Oct 25 22:58:57 2009
> @@ -14,14 +14,16 @@
>  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  *  KIND, either express or implied.  See the License for the
>  *  specific language governing permissions and limitations
> - *  under the License.
> + *  under the License.
> + *
>  *
> - *
>  */
>  package org.apache.qpid.tools.messagestore.commands;
>
>  import org.apache.qpid.tools.messagestore.MessageStoreTool;
>  import org.apache.qpid.server.queue.AMQQueue;
> +import org.apache.qpid.server.txn.ServerTransaction;
> +import org.apache.qpid.server.txn.LocalTransaction;
>
>  public class Copy extends Move
>  {
> @@ -49,7 +51,9 @@
>
>     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
>     {
> -        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), _storeContext);
> +        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
> +        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), txn);
> +        txn.commit();
>     }
>
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Sun Oct 25 22:58:57 2009
> @@ -21,16 +21,13 @@
>  package org.apache.qpid.tools.messagestore.commands;
>
>  import org.apache.commons.codec.binary.Hex;
> -import org.apache.mina.common.ByteBuffer;
> -import org.apache.qpid.framing.abstraction.ContentChunk;
> -import org.apache.qpid.server.queue.AMQMessage;
>  import org.apache.qpid.server.queue.QueueEntryImpl;
>  import org.apache.qpid.server.queue.QueueEntry;
> +import org.apache.qpid.server.message.ServerMessage;
>  import org.apache.qpid.tools.messagestore.MessageStoreTool;
>  import org.apache.qpid.tools.utils.Console;
>
>  import java.io.UnsupportedEncodingException;
> -import java.util.Iterator;
>  import java.util.LinkedList;
>  import java.util.List;
>
> @@ -100,7 +97,7 @@
>
>         for (QueueEntry entry : messages)
>         {
> -            AMQMessage msg = entry.getMessage();
> +            ServerMessage msg = entry.getMessage();
>             if (!includeMsg(msg, msgids))
>             {
>                 continue;
> @@ -112,7 +109,7 @@
>
>             // Show general message information
>             hex.add(Show.Columns.ID.name());
> -            ascii.add(msg.getMessageId().toString());
> +            ascii.add(msg.getMessageNumber().toString());
>
>             hex.add(Console.ROW_DIVIDER);
>             ascii.add(Console.ROW_DIVIDER);
> @@ -136,10 +133,10 @@
>             hex.add(Console.ROW_DIVIDER);
>             ascii.add(Console.ROW_DIVIDER);
>
> -            Iterator bodies = msg.getContentBodyIterator();
> -            if (bodies.hasNext())
> -            {
>
> +            final int messageSize = (int) msg.getSize();
> +            if (messageSize != 0)
> +            {
>                 hex.add("Hex");
>                 hex.add(Console.ROW_DIVIDER);
>
> @@ -147,14 +144,19 @@
>                 ascii.add("ASCII");
>                 ascii.add(Console.ROW_DIVIDER);
>
> -                while (bodies.hasNext())
> +                java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024);
> +
> +                int position = 0;
> +
> +                while(position < messageSize)
>                 {
> -                    ContentChunk chunk = (ContentChunk) bodies.next();
>
> +                    position += msg.getContent(buf, position);
> +                    buf.flip();
>                     //Duplicate so we don't destroy original data :)
> -                    ByteBuffer hexBuffer = chunk.getData().duplicate();
> +                    java.nio.ByteBuffer hexBuffer = buf;
>
> -                    ByteBuffer charBuffer = hexBuffer.duplicate();
> +                    java.nio.ByteBuffer charBuffer = hexBuffer.duplicate();
>
>                     Hex hexencoder = new Hex();
>
> @@ -232,6 +234,7 @@
>
>                         ascii.add(asciiLine);
>                     }
> +                    buf.clear();
>                 }
>             }
>             else
> @@ -252,7 +255,7 @@
>         return display;
>     }
>
> -    private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
> +    private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg,
>                                     String title, boolean routing, boolean headers, boolean messageHeaders)
>     {
>         List<QueueEntry> single = new LinkedList<QueueEntry>();
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java Sun Oct 25 22:58:57 2009
> @@ -14,9 +14,9 @@
>  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  *  KIND, either express or implied.  See the License for the
>  *  specific language governing permissions and limitations
> - *  under the License.
> + *  under the License.
> + *
>  *
> - *
>  */
>  package org.apache.qpid.tools.messagestore.commands;
>
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Sun Oct 25 22:58:57 2009
> @@ -14,17 +14,17 @@
>  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  *  KIND, either express or implied.  See the License for the
>  *  specific language governing permissions and limitations
> - *  under the License.
> + *  under the License.
> + *
>  *
> - *
>  */
>  package org.apache.qpid.tools.messagestore.commands;
>
>  import org.apache.qpid.framing.AMQShortString;
> -import org.apache.qpid.server.queue.QueueEntryImpl;
>  import org.apache.qpid.server.queue.AMQQueue;
>  import org.apache.qpid.server.queue.QueueEntry;
> -import org.apache.qpid.server.store.StoreContext;
> +import org.apache.qpid.server.txn.ServerTransaction;
> +import org.apache.qpid.server.txn.LocalTransaction;
>  import org.apache.qpid.tools.messagestore.MessageStoreTool;
>
>  import java.util.LinkedList;
> @@ -33,12 +33,6 @@
>  public class Move extends AbstractCommand
>  {
>
> -    /**
> -     * Since the Coopy command is not associated with a real channel we can safely create our own store context
> -     * for use in the few methods that require one.
> -     */
> -    protected StoreContext _storeContext = new StoreContext();
> -
>     public Move(MessageStoreTool tool)
>     {
>         super(tool);
> @@ -172,7 +166,7 @@
>             {
>                 for (QueueEntry msg : messages)
>                 {
> -                    ids.add(msg.getMessage().getMessageId());
> +                    ids.add(msg.getMessage().getMessageNumber());
>                 }
>             }
>         }
> @@ -201,6 +195,8 @@
>
>     protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
>     {
> -        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
> +        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
> +        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), txn);
> +        txn.commit();
>     }
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Sun Oct 25 22:58:57 2009
> @@ -62,6 +62,6 @@
>
>     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
>     {
> -        fromQueue.removeMessagesFromQueue(start, end, _storeContext);
> +        fromQueue.removeMessagesFromQueue(start, end);
>     }
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
> +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Sun Oct 25 22:58:57 2009
> @@ -25,10 +25,10 @@
>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>  import org.apache.qpid.framing.FieldTable;
>  import org.apache.qpid.framing.abstraction.MessagePublishInfo;
> -import org.apache.qpid.server.queue.AMQMessage;
> -import org.apache.qpid.server.queue.QueueEntryImpl;
> +import org.apache.qpid.server.message.AMQMessage;
>  import org.apache.qpid.server.queue.AMQQueue;
>  import org.apache.qpid.server.queue.QueueEntry;
> +import org.apache.qpid.server.message.ServerMessage;
>  import org.apache.qpid.tools.messagestore.MessageStoreTool;
>  import org.apache.qpid.tools.utils.Console;
>
> @@ -171,7 +171,7 @@
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
> -//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
> +//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber();
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
> @@ -182,14 +182,14 @@
>  //        //Print out all the property names
>  //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
>  //
> -//        msg.getMessageId();
> +//        msg.getMessageNumber();
>  //        msg.getSize();
>  //        msg.getArrivalTime();
>
>  //        msg.getDeliveredSubscription();
>  //        msg.getDeliveredToConsumer();
>  //        msg.getMessageHandle();
> -//        msg.getMessageId();
> +//        msg.getMessageNumber();
>  //        msg.getMessagePublishInfo();
>  //        msg.getPublisher();
>
> @@ -337,30 +337,24 @@
>         //Add create the table of data
>         for (QueueEntry entry : messages)
>         {
> -            AMQMessage msg = entry.getMessage();
> +            ServerMessage msg = entry.getMessage();
>             if (!includeMsg(msg, msgids))
>             {
>                 continue;
>             }
>
> -            id.add(msg.getMessageId().toString());
> +            id.add(msg.getMessageNumber().toString());
>
>             size.add("" + msg.getSize());
>
>             arrival.add("" + msg.getArrivalTime());
>
> -            try
> -            {
> -                ispersitent.add(msg.isPersistent() ? "true" : "false");
> -            }
> -            catch (AMQException e)
> -            {
> -                ispersitent.add("n/a");
> -            }
> +            ispersitent.add(msg.isPersistent() ? "true" : "false");
> +
>
> -            isredelivered.add(msg.isRedelivered() ? "true" : "false");
> +            isredelivered.add(entry.isRedelivered() ? "true" : "false");
>
> -            isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
> +            isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
>
>  //        msg.getMessageHandle();
>
> @@ -368,7 +362,10 @@
>
>             try
>             {
> -                headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
> +                if(msg instanceof AMQMessage)
> +                {
> +                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
> +                }
>             }
>             catch (AMQException e)
>             {
> @@ -417,7 +414,11 @@
>                 MessagePublishInfo info = null;
>                 try
>                 {
> -                    info = msg.getMessagePublishInfo();
> +                    if(msg instanceof AMQMessage)
> +                    {
> +                        info = ((AMQMessage)msg).getMessagePublishInfo();
> +                    }
> +
>                 }
>                 catch (AMQException e)
>                 {
> @@ -457,14 +458,14 @@
>         return data;
>     }
>
> -    protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
> +    protected boolean includeMsg(ServerMessage msg, List<Long> msgids)
>     {
>         if (msgids == null)
>         {
>             return true;
>         }
>
> -        Long msgid = msg.getMessageId();
> +        Long msgid = msg.getMessageNumber();
>
>         boolean found = false;
>
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Sun Oct 25 22:58:57 2009
> @@ -27,6 +27,7 @@
>  import org.apache.qpid.server.queue.QueueRegistry;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
>  import org.apache.qpid.server.registry.IApplicationRegistry;
> +import org.apache.qpid.server.virtualhost.VirtualHostImpl;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
>
>  public class AMQBrokerManagerMBeanTest extends TestCase
> @@ -46,7 +47,7 @@
>         assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
>
>
> -        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
> +        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
>         mbean.createNewExchange(exchange1, "direct", false);
>         mbean.createNewExchange(exchange2, "topic", false);
>         mbean.createNewExchange(exchange3, "headers", false);
> @@ -68,7 +69,7 @@
>     {
>         String queueName = "testQueue_" + System.currentTimeMillis();
>
> -        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
> +        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
>
>         assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
>
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Sun Oct 25 22:58:57 2009
> @@ -22,23 +22,22 @@
>
>  import junit.framework.TestCase;
>  import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
> -import org.apache.qpid.server.queue.MockQueueEntry;
>  import org.apache.qpid.server.queue.QueueEntry;
>  import org.apache.qpid.server.queue.SimpleQueueEntryList;
>  import org.apache.qpid.server.queue.MockAMQMessage;
>  import org.apache.qpid.server.queue.AMQQueue;
>  import org.apache.qpid.server.queue.MockAMQQueue;
> -import org.apache.qpid.server.queue.AMQMessage;
> +import org.apache.qpid.server.message.AMQMessage;
>  import org.apache.qpid.server.queue.QueueEntryIterator;
> -import org.apache.qpid.server.store.StoreContext;
>  import org.apache.qpid.server.subscription.Subscription;
>  import org.apache.qpid.server.subscription.MockSubscription;
> +import org.apache.qpid.server.store.MemoryMessageStore;
> +import org.apache.qpid.server.store.MessageStore;
>  import org.apache.qpid.AMQException;
>
>  import java.util.Map;
>  import java.util.LinkedHashMap;
>  import java.util.LinkedList;
> -import java.util.Iterator;
>
>  /**
>  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
> @@ -63,6 +62,7 @@
>     UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
>     private static final int INITIAL_MSG_COUNT = 10;
>     private AMQQueue _queue = new MockAMQQueue(getName());
> +    private MessageStore _messageStore = new MemoryMessageStore();
>     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
>
>     @Override
> @@ -89,7 +89,7 @@
>         while(queueEntries.advance())
>         {
>             QueueEntry entry = queueEntries.getNode();
> -            _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
> +            _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
>
>             // Store the entry for future inspection
>             _referenceList.add(entry);
> @@ -137,7 +137,7 @@
>
>         // requeueIfUnabletoResend doesn't matter here.
>         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
> -                                                                    msgToResend, true, new StoreContext()));
> +                                                                    msgToResend, true, _messageStore));
>
>         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
>         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
> @@ -166,7 +166,7 @@
>
>         // requeueIfUnabletoResend doesn't matter here.
>         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
> -                                                                    msgToResend, true, new StoreContext()));
> +                                                                    msgToResend, true, _messageStore));
>
>         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
>         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
> @@ -187,7 +187,7 @@
>
>         // requeueIfUnabletoResend = true so all messages should go to msgToRequeue
>         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
> -                                                                    msgToResend, true, new StoreContext()));
> +                                                                    msgToResend, true, _messageStore));
>
>         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
>         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
> @@ -208,7 +208,7 @@
>
>         // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
>         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
> -                                                                    msgToResend, false, new StoreContext()));
> +                                                                    msgToResend, false, _messageStore));
>
>         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
>         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
> @@ -240,7 +240,7 @@
>
>         // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
>         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
> -                                                                    msgToResend, false, new StoreContext()));
> +                                                                    msgToResend, false, _messageStore));
>
>         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
>         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Sun Oct 25 22:58:57 2009
> @@ -7,9 +7,9 @@
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
> - *
> + *
>  *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -27,6 +27,7 @@
>  import java.util.Iterator;
>  import java.util.List;
>  import java.util.Locale;
> +import java.util.Collections;
>
>  import junit.framework.TestCase;
>
> @@ -482,12 +483,17 @@
>     {
>         // Check default
>         ServerConfiguration serverConfig = new ServerConfiguration(_config);
> -        assertEquals(5672, serverConfig.getPort());
> +        assertNotNull(serverConfig.getPorts());
> +        assertEquals(1, serverConfig.getPorts().size());
> +        assertEquals(5672, serverConfig.getPorts().get(0));
> +
>
>         // Check value we set
> -        _config.setProperty("connector.port", 10);
> +        _config.setProperty("connector.port", "10");
>         serverConfig = new ServerConfiguration(_config);
> -        assertEquals(10, serverConfig.getPort());
> +        assertNotNull(serverConfig.getPorts());
> +        assertEquals(1, serverConfig.getPorts().size());
> +        assertEquals("10", serverConfig.getPorts().get(0));
>     }
>
>     public void testGetBind() throws ConfigurationException
> @@ -723,7 +729,9 @@
>         ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile());
>         assertEquals(4235, config.getSSLPort()); // From first file, not
>                                                  // overriden by second
> -        assertEquals(2342, config.getPort()); // From the first file, not
> +        assertNotNull(config.getPorts());
> +        assertEquals(1, config.getPorts().size());
> +        assertEquals("2342", config.getPorts().get(0)); // From the first file, not
>                                               // present in the second
>         assertEquals(true, config.getQpidNIO()); // From the second file, not
>                                                  // present in the first
> @@ -967,7 +975,7 @@
>         out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>");
>         out.write("</firewall>\n");
>         out.close();
> -
> +
>         reg.getConfiguration().reparseConfigFile();
>
>         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Sun Oct 25 22:58:57 2009
> @@ -14,21 +14,20 @@
>  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  *  KIND, either express or implied.  See the License for the
>  *  specific language governing permissions and limitations
> - *  under the License.
> - *
> + *  under the License.
> + *
>  */
>  package org.apache.qpid.server.configuration;
>
>
>  import junit.framework.TestCase;
> -import org.apache.commons.configuration.ConfigurationException;
>  import org.apache.commons.configuration.XMLConfiguration;
> -import org.apache.qpid.AMQException;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.server.queue.AMQPriorityQueue;
>  import org.apache.qpid.server.queue.AMQQueue;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
> +import org.apache.qpid.server.virtualhost.VirtualHostImpl;
>
>  public class VirtualHostConfigurationTest extends TestCase
>  {
> @@ -55,50 +54,50 @@
>
>         super.tearDown();
>     }
> -
> +
>     public void testQueuePriority() throws Exception
>     {
>         // Set up queue with 5 priorities
> -        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
> +        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
>                               "atest");
> -        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
> +        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
>                               "amq.direct");
> -        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
> +        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
>                               "5");
>
>         // Set up queue with JMS style priorities
> -        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
> +        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
>                               "ptest");
> -        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
> +        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
>                               "amq.direct");
> -        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
> +        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
>                                "true");
> -
> +
>         // Set up queue with no priorities
> -        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
> +        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
>                               "ntest");
> -        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
> +        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
>                               "amq.direct");
> -        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
> +        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
>                               "false");
> -
> -        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
> -
> +
> +        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
> +
>         // Check that atest was a priority queue with 5 priorities
>         AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
>         assertTrue(atest instanceof AMQPriorityQueue);
>         assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
> -
> +
>         // Check that ptest was a priority queue with 10 priorities
>         AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
>         assertTrue(ptest instanceof AMQPriorityQueue);
>         assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
> -
> +
>         // Check that ntest wasn't a priority queue
>         AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
>         assertFalse(ntest instanceof AMQPriorityQueue);
>     }
> -
> +
>     public void testQueueAlerts() throws Exception
>     {
>         // Set up queue with 5 priorities
> @@ -106,7 +105,7 @@
>         configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1");
>         configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2");
>         configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3");
> -
> +
>         configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest");
>         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct");
>         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4");
> @@ -114,21 +113,21 @@
>         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6");
>
>         configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest");
> -
> -        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
> -
> +
> +        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
> +
>         // Check specifically configured values
>         AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
>         assertEquals(4, aTest.getMaximumQueueDepth());
>         assertEquals(5, aTest.getMaximumMessageSize());
>         assertEquals(6, aTest.getMaximumMessageAge());
> -
> -        // Check default values
> +
> +        // Check default values
>         AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
>         assertEquals(1, bTest.getMaximumQueueDepth());
>         assertEquals(2, bTest.getMaximumMessageSize());
>         assertEquals(3, bTest.getMaximumMessageAge());
> -
> +
>     }
> -
> +
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun Oct 25 22:58:57 2009
> @@ -27,18 +27,18 @@
>  import org.apache.qpid.server.queue.*;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
>  import org.apache.qpid.server.store.MessageStore;
> -import org.apache.qpid.server.store.SkeletonMessageStore;
>  import org.apache.qpid.server.store.MemoryMessageStore;
> -import org.apache.qpid.server.store.StoreContext;
> -import org.apache.qpid.server.txn.NonTransactionalContext;
> -import org.apache.qpid.server.txn.TransactionalContext;
> -import org.apache.qpid.server.RequiredDeliveryException;
> -import org.apache.qpid.server.virtualhost.VirtualHost;
> +import org.apache.qpid.server.store.StoredMessage;
> +import org.apache.qpid.server.message.ServerMessage;
> +import org.apache.qpid.server.message.AMQMessageHeader;
> +import org.apache.qpid.server.message.AMQMessage;
> +import org.apache.qpid.server.message.MessageMetaData;
>  import org.apache.qpid.server.subscription.Subscription;
>  import org.apache.qpid.server.protocol.AMQProtocolSession;
>  import org.apache.log4j.Logger;
>
>  import java.util.*;
> +import java.util.concurrent.atomic.AtomicLong;
>
>  public class AbstractHeadersExchangeTestBase extends TestCase
>  {
> @@ -52,10 +52,6 @@
>      */
>     private MessageStore _store = new MemoryMessageStore();
>
> -    private StoreContext _storeContext = new StoreContext();
> -
> -    private MessageHandleFactory _handleFactory = new MessageHandleFactory();
> -
>     private int count;
>
>     public void testDoNothing()
> @@ -91,14 +87,18 @@
>     }
>
>
> -    protected void route(Message m) throws AMQException
> +    protected int route(Message m) throws AMQException
>     {
> +        m.getIncomingMessage().headersReceived();
>         m.route(exchange);
> -        m.getIncomingMessage().routingComplete(_store, _handleFactory);
>         if(m.getIncomingMessage().allContentReceived())
>         {
> -            m.getIncomingMessage().deliverToQueues();
> +            for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
> +            {
> +                q.enqueue(m);
> +            }
>         }
> +        return m.getIncomingMessage().getDestinationQueues().size();
>     }
>
>     protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
> @@ -118,10 +118,8 @@
>
>     protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
>     {
> -        try
> -        {
> -            route(m);
> -            assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
> +            int queueCount = route(m);
> +
>             for (TestQueue q : queues)
>             {
>                 if (expected.contains(q))
> @@ -135,12 +133,11 @@
>                     //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
>                 }
>             }
> -        }
>
> -        catch (NoRouteException ex)
> -        {
> -            assertTrue("Expected "+m+" not to be returned",expectReturn);
> -        }
> +            if(expectReturn)
> +            {
> +                assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
> +            }
>
>     }
>
> @@ -242,6 +239,11 @@
>     {
>         final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
>
> +        public String toString()
> +        {
> +            return getName().toString();
> +        }
> +
>         public TestQueue(AMQShortString name) throws AMQException
>         {
>             super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
> @@ -256,9 +258,9 @@
>          * @throws AMQException
>          */
>         @Override
> -        public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
> +        public QueueEntry enqueue(ServerMessage msg) throws AMQException
>         {
> -            messages.add( new HeadersExchangeTest.Message(msg));
> +            messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
>             return new QueueEntry()
>             {
>
> @@ -317,6 +319,11 @@
>                     return false;  //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> +                public boolean isAcquiredBy(Subscription subscription)
> +                {
> +                    return false;  //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
>                 public void setDeliveredToSubscription()
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
> @@ -327,9 +334,9 @@
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public String debugIdentity()
> +                public boolean releaseButRetain()
>                 {
> -                    return null;  //To change body of implemented methods use File | Settings | File Templates.
> +                    return false;
>                 }
>
>                 public boolean immediateAndNotDelivered()
> @@ -337,11 +344,26 @@
>                     return false;  //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public void setRedelivered(boolean b)
> +                public void setRedelivered()
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> +                public AMQMessageHeader getMessageHeader()
> +                {
> +                    return null;  //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
> +                public boolean isPersistent()
> +                {
> +                    return false;  //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
> +                public boolean isRedelivered()
> +                {
> +                    return false;  //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
>                 public Subscription getDeliveredSubscription()
>                 {
>                     return null;  //To change body of implemented methods use File | Settings | File Templates.
> @@ -362,17 +384,22 @@
>                     return false;  //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public void requeue(StoreContext storeContext) throws AMQException
> +                public void requeue()
> +                {
> +                    //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
> +                public void requeue(Subscription subscription)
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public void dequeue(final StoreContext storeContext) throws FailedDequeueException
> +                public void dequeue()
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public void dispose(final StoreContext storeContext) throws MessageCleanupException
> +                public void dispose()
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
> @@ -382,7 +409,12 @@
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
>
> -                public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
> +                public void discard()
> +                {
> +                    //To change body of implemented methods use File | Settings | File Templates.
> +                }
> +
> +                public void routeToAlternate()
>                 {
>                     //To change body of implemented methods use File | Settings | File Templates.
>                 }
> @@ -421,15 +453,16 @@
>      */
>     static class Message extends AMQMessage
>     {
> +        private static AtomicLong _messageId = new AtomicLong();
> +
>         private class TestIncomingMessage extends IncomingMessage
>         {
>
>             public TestIncomingMessage(final long messageId,
>                                        final MessagePublishInfo info,
> -                                       final TransactionalContext txnContext,
>                                        final AMQProtocolSession publisher)
>             {
> -                super(messageId, info, txnContext, publisher);
> +                super(info);
>             }
>
>
> @@ -439,7 +472,7 @@
>             }
>
>
> -            public ContentHeaderBody getContentHeaderBody()
> +            public ContentHeaderBody getContentHeader()
>             {
>                 try
>                 {
> @@ -454,15 +487,6 @@
>
>         private IncomingMessage _incoming;
>
> -        private static MessageStore _messageStore = new SkeletonMessageStore();
> -
> -        private static StoreContext _storeContext = new StoreContext();
> -
> -
> -        private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
> -                                                                                      null,
> -                                                                         new LinkedList<RequiredDeliveryException>()
> -        );
>
>         Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
>         {
> @@ -471,7 +495,7 @@
>
>         Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
>         {
> -            this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
> +            this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
>         }
>
>         public IncomingMessage getIncomingMessage()
> @@ -484,46 +508,34 @@
>                         ContentHeaderBody header,
>                         List<ContentBody> bodies) throws AMQException
>         {
> -            super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
> +            super(new MockStoredMessage(messageId, publish, header));
> +
> +            StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
>
> +            int pos = 0;
> +            for(ContentBody body : bodies)
> +            {
> +                storedMessage.addContent(pos, body.payload.duplicate().buf());
> +                pos += body.payload.limit();
> +            }
>
> -
> -            _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
> +            _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
>             _incoming.setContentHeaderBody(header);
>
>
>         }
>
> -        private static AMQMessageHandle createMessageHandle(final long messageId,
> -                                                            final MessagePublishInfo publish,
> -                                                            final ContentHeaderBody header)
> -        {
> -
> -            final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
> -                                                                                                       _messageStore,
> -                                                                                                       true);
> -
> -            try
> -            {
> -                amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
> -            }
> -            catch (AMQException e)
> -            {
> -
> -            }
> -            return amqMessageHandle;
> -        }
>
>         private Message(AMQMessage msg) throws AMQException
>         {
> -            super(msg);
> +            super(msg.getStoredMessage());
>         }
>
>
>
>         void route(Exchange exchange) throws AMQException
>         {
> -            exchange.route(_incoming);
> +            _incoming.enqueue(exchange.route(_incoming));
>         }
>
>
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Sun Oct 25 22:58:57 2009
> @@ -29,6 +29,7 @@
>  import org.apache.qpid.server.registry.ApplicationRegistry;
>  import org.apache.qpid.server.registry.IApplicationRegistry;
>  import org.apache.qpid.server.management.ManagedObject;
> +import org.apache.qpid.server.virtualhost.VirtualHostImpl;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
>  import org.apache.qpid.exchange.ExchangeDefaults;
>  import org.apache.qpid.framing.AMQShortString;
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Sun Oct 25 22:58:57 2009
> @@ -22,16 +22,95 @@
>
>  import java.util.Map;
>  import java.util.HashMap;
> +import java.util.Set;
>
>  import junit.framework.TestCase;
>  import org.apache.qpid.framing.FieldTable;
> +import org.apache.qpid.server.message.AMQMessageHeader;
>
>  /**
>  */
>  public class HeadersBindingTest extends TestCase
>  {
> +
> +    private class MockHeader implements AMQMessageHeader
> +    {
> +
> +        private final Map<String, Object> _headers = new HashMap<String, Object>();
> +
> +        public String getCorrelationId()
> +        {
> +            return null;
> +        }
> +
> +        public long getExpiration()
> +        {
> +            return 0;
> +        }
> +
> +        public String getMessageId()
> +        {
> +            return null;
> +        }
> +
> +        public String getMimeType()
> +        {
> +            return null;  //To change body of implemented methods use File | Settings | File Templates.
> +        }
> +
> +        public String getEncoding()
> +        {
> +            return null;  //To change body of implemented methods use File | Settings | File Templates.
> +        }
> +
> +        public byte getPriority()
> +        {
> +            return 0;
> +        }
> +
> +        public long getTimestamp()
> +        {
> +            return 0;
> +        }
> +
> +        public String getType()
> +        {
> +            return null;
> +        }
> +
> +        public String getReplyTo()
> +        {
> +            return null;
> +        }
> +
> +        public Object getHeader(String name)
> +        {
> +            return _headers.get(name);
> +        }
> +
> +        public boolean containsHeaders(Set<String> names)
> +        {
> +            return _headers.keySet().containsAll(names);
> +        }
> +
> +        public boolean containsHeader(String name)
> +        {
> +            return _headers.containsKey(name);
> +        }
> +
> +        public void setString(String key, String value)
> +        {
> +            setObject(key,value);
> +        }
> +
> +        public void setObject(String key, Object value)
> +        {
> +            _headers.put(key,value);
> +        }
> +    }
> +
>     private FieldTable bindHeaders = new FieldTable();
> -    private FieldTable matchHeaders = new FieldTable();
> +    private MockHeader matchHeaders = new MockHeader();
>
>     public void testDefault_1()
>     {
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sun Oct 25 22:58:57 2009
> @@ -7,9 +7,9 @@
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
> - *
> + *
>  *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -35,7 +35,8 @@
>         super.setUp();
>         // AR will use the NullAR by default
>         // Just use the first vhost.
> -        VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
> +        VirtualHost
> +                virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
>         _protocolSession = new InternalTestProtocolSession(virtualHost);
>     }
>
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java Sun Oct 25 22:58:57 2009
> @@ -121,7 +121,7 @@
>         // Verify that the message has the correct type
>         assertTrue("Message contains the [con: prefix",
>                    logs.get(0).toString().contains("[con:"));
> -
> +
>
>         // Verify that all the values were presented to the MessageFormatter
>         // so we will not end up with '{n}' entries in the log.
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java Sun Oct 25 22:58:57 2009
> @@ -75,7 +75,7 @@
>         // Correctly Close the AR we created
>         ApplicationRegistry.remove();
>
> -        super.tearDown();
> +        super.tearDown();
>     }
>
>     private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java Sun Oct 25 22:58:57 2009
> @@ -41,12 +41,12 @@
>     {
>         String location = "/path/to/the/message/store.files";
>
> -        _logMessage = MessageStoreMessages.MST_1002(location);
> +        _logMessage = ConfigStoreMessages.CFG_1002(location);
>         List<Object> log = performLog();
>
>         String[] expected = {"Store location :", location};
>
> -        validateLogMessage(log, "MST-1002", expected);
> +        validateLogMessage(log, "CFG-1002", expected);
>     }
>
>     public void testMessage1003()
> @@ -59,7 +59,7 @@
>         validateLogMessage(log, "MST-1003", expected);
>     }
>
> -    public void testMessage1004()
> +  /*  public void testMessage1004()
>     {
>         _logMessage = MessageStoreMessages.MST_1004(null,false);
>         List<Object> log = performLog();
> @@ -91,7 +91,7 @@
>
>         // Here we use MessageFormat to ensure the messasgeCount of 2000 is
>         // reformated for display as '2,000'
> -        String[] expected = {"Recovered ",
> +        String[] expected = {"Recovered ",
>                              MessageFormat.format("{0,number}", messasgeCount),
>                              "messages for queue", queueName};
>
> @@ -119,5 +119,5 @@
>
>         validateLogMessage(log, "MST-1006", expected);
>     }
> -
> +    */
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Sun Oct 25 22:58:57 2009
> @@ -23,19 +23,16 @@
>  import junit.framework.TestCase;
>  import org.apache.log4j.Logger;
>  import org.apache.qpid.AMQException;
> -import org.apache.qpid.codec.AMQCodecFactory;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.server.AMQChannel;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
>  import org.apache.qpid.server.queue.AMQQueue;
>  import org.apache.qpid.server.queue.AMQQueueFactory;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
> -import org.apache.qpid.server.registry.IApplicationRegistry;
>  import org.apache.qpid.server.store.MessageStore;
>  import org.apache.qpid.server.store.SkeletonMessageStore;
>
>  import javax.management.JMException;
> -import java.security.Principal;
>
>  /** Test class to test MBean operations for AMQMinaProtocolSession. */
>  public class AMQProtocolSessionMBeanTest extends TestCase
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Sun Oct 25 22:58:57 2009
> @@ -21,19 +21,19 @@
>  package org.apache.qpid.server.protocol;
>
>  import java.security.Principal;
> -import java.util.ArrayList;
> -import java.util.HashMap;
> -import java.util.LinkedList;
> -import java.util.List;
> -import java.util.Map;
> +import java.util.*;
>  import java.util.concurrent.atomic.AtomicInteger;
>
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.framing.AMQShortString;
> +import org.apache.qpid.framing.ContentHeaderBody;
> +import org.apache.qpid.framing.abstraction.MessagePublishInfo;
>  import org.apache.qpid.server.output.ProtocolOutputConverter;
> -import org.apache.qpid.server.queue.AMQMessage;
> +import org.apache.qpid.server.message.AMQMessage;
> +import org.apache.qpid.server.queue.QueueEntry;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
> +import org.apache.qpid.server.message.MessageContentSource;
>  import org.apache.qpid.transport.TestNetworkDriver;
>
>  public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
> @@ -70,6 +70,16 @@
>         return (byte) 8;
>     }
>
> +    public void writeReturn(MessagePublishInfo messagePublishInfo,
> +                            ContentHeaderBody header,
> +                            MessageContentSource msgContent,
> +                            int channelId,
> +                            int replyCode,
> +                            AMQShortString replyText) throws AMQException
> +    {
> +        //To change body of implemented methods use File | Settings | File Templates.
> +    }
> +
>     public byte getProtocolMinorVersion()
>     {
>         return (byte) 0;
> @@ -82,12 +92,12 @@
>         synchronized (_channelDelivers)
>         {
>             List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
> -
> +
>             if (all == null)
>             {
>                 return new ArrayList<DeliveryPair>(0);
>             }
> -
> +
>             List<DeliveryPair> msgs = all.subList(0, count);
>
>             List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
> @@ -108,7 +118,7 @@
>     {
>     }
>
> -    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
> +    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
>     {
>         _deliveryCount.incrementAndGet();
>
> @@ -130,11 +140,11 @@
>                 consumers.put(consumerTag, consumerDelivers);
>             }
>
> -            consumerDelivers.add(new DeliveryPair(deliveryTag, message));
> +            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
>         }
>     }
>
> -    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
> +    public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
>     {
>     }
>
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Sun Oct 25 22:58:57 2009
> @@ -22,17 +22,10 @@
>
>  import junit.framework.TestCase;
>  import org.apache.qpid.AMQException;
> -import org.apache.qpid.codec.AMQCodecFactory;
>  import org.apache.qpid.protocol.AMQConstant;
>  import org.apache.qpid.server.AMQChannel;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
> -import org.apache.qpid.server.logging.actors.CurrentActor;
>  import org.apache.qpid.server.registry.ApplicationRegistry;
> -import org.apache.qpid.server.registry.IApplicationRegistry;
> -import org.apache.qpid.AMQException;
> -import org.apache.qpid.protocol.AMQConstant;
> -
> -import java.security.Principal;
>
>  /** Test class to test MBean operations for AMQMinaProtocolSession. */
>  public class MaxChannelsTest extends TestCase
> @@ -66,14 +59,14 @@
>         }
>         assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
>     }
> -
> +
>     @Override
>     public void setUp()
>     {
>         //Highlight that this test will cause a new AR to be created
>         ApplicationRegistry.getInstance();
>     }
> -
> +
>     @Override
>     public void tearDown() throws Exception
>     {
> @@ -87,7 +80,7 @@
>         {
>             // Correctly Close the AR we created
>             ApplicationRegistry.remove();
> -        }
> +        }
>     }
>
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Sun Oct 25 22:58:57 2009
> @@ -1,6 +1,6 @@
>  package org.apache.qpid.server.queue;
>  /*
> - *
> + *
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
> @@ -8,21 +8,22 @@
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
> - *
> + *
>  *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> + *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  * KIND, either express or implied.  See the License for the
>  * specific language governing permissions and limitations
>  * under the License.
> - *
> + *
>  */
>
>  import java.util.ArrayList;
>
>  import org.apache.qpid.AMQException;
> +import org.apache.qpid.server.message.AMQMessage;
>  import org.apache.qpid.framing.BasicContentHeaderProperties;
>  import org.apache.qpid.framing.FieldTable;
>  import junit.framework.AssertionFailedError;
> @@ -42,38 +43,38 @@
>     {
>
>         // Enqueue messages in order
> -        _queue.enqueue(null, createMessage(1L, (byte) 10));
> -        _queue.enqueue(null, createMessage(2L, (byte) 4));
> -        _queue.enqueue(null, createMessage(3L, (byte) 0));
> -
> +        _queue.enqueue(createMessage(1L, (byte) 10));
> +        _queue.enqueue(createMessage(2L, (byte) 4));
> +        _queue.enqueue(createMessage(3L, (byte) 0));
> +
>         // Enqueue messages in reverse order
> -        _queue.enqueue(null, createMessage(4L, (byte) 0));
> -        _queue.enqueue(null, createMessage(5L, (byte) 4));
> -        _queue.enqueue(null, createMessage(6L, (byte) 10));
> -
> +        _queue.enqueue(createMessage(4L, (byte) 0));
> +        _queue.enqueue(createMessage(5L, (byte) 4));
> +        _queue.enqueue(createMessage(6L, (byte) 10));
> +
>         // Enqueue messages out of order
> -        _queue.enqueue(null, createMessage(7L, (byte) 4));
> -        _queue.enqueue(null, createMessage(8L, (byte) 10));
> -        _queue.enqueue(null, createMessage(9L, (byte) 0));
> -
> +        _queue.enqueue(createMessage(7L, (byte) 4));
> +        _queue.enqueue(createMessage(8L, (byte) 10));
> +        _queue.enqueue(createMessage(9L, (byte) 0));
> +
>         // Register subscriber
>         _queue.registerSubscription(_subscription, false);
>         Thread.sleep(150);
> -
> +
>         ArrayList<QueueEntry> msgs = _subscription.getMessages();
>         try
>         {
> -            assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
> -            assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
> -            assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
> -
> -            assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
> -            assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
> -            assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
> -
> -            assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
> -            assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
> -            assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
> +            assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
> +            assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
> +            assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
> +
> +            assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
> +            assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
> +            assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
> +
> +            assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
> +            assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
> +            assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
>         }
>         catch (AssertionFailedError afe)
>         {
> @@ -81,7 +82,7 @@
>             int index = 1;
>             for (QueueEntry qe : msgs)
>             {
> -                System.err.println(index + ":" + qe.getMessage().getMessageId());
> +                System.err.println(index + ":" + qe.getMessage().getMessageNumber());
>                 index++;
>             }
>
> @@ -98,10 +99,10 @@
>         msg.getContentHeaderBody().properties = props;
>         return msg;
>     }
> -
> +
>     protected AMQMessage createMessage(Long id) throws AMQException
>     {
>         return createMessage(id, (byte) 0);
>     }
> -
> +
>  }
>
> Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=829675&r1=829674&r2=829675&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
> +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Sun Oct 25 22:58:57 2009
> @@ -20,21 +20,17 @@
>  */
>  package org.apache.qpid.server.queue;
>
> -import java.util.ArrayList;
> -import java.util.LinkedList;
> -
> -import javax.management.Notification;
> -
>  import junit.framework.TestCase;
> -
>  import org.apache.mina.common.ByteBuffer;
>  import org.apache.qpid.AMQException;
>  import org.apache.qpid.framing.AMQShortString;
>  import org.apache.qpid.framing.ContentHeaderBody;
> +import org.apache.qpid.framing.BasicContentHeaderProperties;
>  import org.apache.qpid.framing.abstraction.ContentChunk;
>  import org.apache.qpid.framing.abstraction.MessagePublishInfo;
>  import org.apache.qpid.server.AMQChannel;
> -import org.apache.qpid.server.RequiredDeliveryException;
> +import org.apache.qpid.server.message.AMQMessage;
> +import org.apache.qpid.server.message.MessageMetaData;
>  import org.apache.qpid.server.logging.actors.CurrentActor;
>  import org.apache.qpid.server.protocol.AMQProtocolEngine;
>  import org.apache.qpid.server.protocol.InternalTestProtocolSession;
> @@ -42,16 +38,16 @@
>  import org.apache.qpid.server.registry.IApplicationRegistry;
>  import org.apache.qpid.server.store.MemoryMessageStore;
>  import org.apache.qpid.server.store.MessageStore;
> -import org.apache.qpid.server.store.StoreContext;
>  import org.apache.qpid.server.subscription.Subscription;
>  import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
> -import org.apache.qpid.server.txn.NonTransactionalContext;
> -import org.apache.qpid.server.txn.TransactionalContext;
>  import org.apache.qpid.server.virtualhost.VirtualHost;
>
> +import javax.management.Notification;
> +import java.util.ArrayList;
> +
>  /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
>  public class AMQQueueAlertTest extends TestCase
> -{
> +{
>     private final static long MAX_MESSAGE_COUNT = 50;
>     private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
>     private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
> @@ -61,11 +57,6 @@
>     private VirtualHost _virtualHost;
>     private AMQProtocolEngine _protocolSession;
>     private MessageStore _messageStore = new MemoryMessageStore();
> -    private StoreContext _storeContext = new StoreContext();
> -    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
> -                                                                                     null,
> -                                                                                     new LinkedList<RequiredDeliveryException>()
> -    );
>     private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
>
>     /**
> @@ -75,6 +66,10 @@
>      */
>     public void testMessageCountAlert() throws Exception
>     {
> +        _protocolSession = new InternalTestProtocolSession(_virtualHost);
> +        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
> +        _protocolSession.addChannel(channel);
> +
>         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
>                               false, _virtualHost,
>                               null);
> @@ -82,7 +77,7 @@
>
>         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
>
> -        sendMessages(MAX_MESSAGE_COUNT, 256l);
> +        sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
>         assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
>
>         Notification lastNotification = _queueMBean.getLastNotification();
> @@ -99,6 +94,10 @@
>      */
>     public void testMessageSizeAlert() throws Exception
>     {
> +        _protocolSession = new InternalTestProtocolSession(_virtualHost);
> +        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
> +        _protocolSession.addChannel(channel);
> +
>         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
>                               false, _virtualHost,
>                               null);
> @@ -106,7 +105,7 @@
>         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
>         _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
>
> -        sendMessages(1, MAX_MESSAGE_SIZE * 2);
> +        sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
>         assertTrue(_queueMBean.getMessageCount() == 1);
>
>         Notification lastNotification = _queueMBean.getLastNotification();
> @@ -125,6 +124,10 @@
>      */
>     public void testQueueDepthAlertNoSubscriber() throws Exception
>     {
> +        _protocolSession = new InternalTestProtocolSession(_virtualHost);
> +        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
> +        _protocolSession.addChannel(channel);
> +
>         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
>                               false, _virtualHost,
>                               null);
> @@ -134,7 +137,7 @@
>
>         while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
>         {
> -            sendMessages(1, MAX_MESSAGE_SIZE);
> +            sendMessages(channel, 1, MAX_MESSAGE_SIZE);
>         }
>
>         Notification lastNotification = _queueMBean.getLastNotification();
> @@ -154,6 +157,10 @@
>      */
>     public void testMessageAgeAlert() throws Exception
>     {
> +        _protocolSession = new InternalTestProtocolSession(_virtualHost);
> +        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
> +        _protocolSession.addChannel(channel);
> +
>         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
>                               false, _virtualHost,
>                               null);
> @@ -161,7 +168,7 @@
>         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
>         _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
>
> -        sendMessages(1, MAX_MESSAGE_SIZE);
> +        sendMessages(channel, 1, MAX_MESSAGE_SIZE);
>
>         // Ensure message sits on queue long enough to age.
>         Thread.sleep(MAX_MESSAGE_AGE * 2);
> @@ -201,7 +208,7 @@
>         // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
>         int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
>         long totalSize = (messageCount * MAX_MESSAGE_SIZE);
> -        sendMessages(messageCount, MAX_MESSAGE_SIZE);
> +        sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
>
>         // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
>         // so there should be no Queue_Deoth alert raised
> @@ -228,7 +235,7 @@
>
>         _queue.registerSubscription(
>                 subscription2, false);
> -
> +
>         while (_queue.getUndeliveredMessageCount()!= 0)
>         {
>             Thread.sleep(100);
> @@ -247,7 +254,7 @@
>         _queueMBean.clearQueue();
>         assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
>     }
> -
> +
>     protected IncomingMessage message(final boolean immediate, long size) throws AMQException
>     {
>         MessagePublishInfo publish = new MessagePublishInfo()
> @@ -280,8 +287,10 @@
>         };
>
>         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
> +        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
> +        contentHeaderBody.properties = props;
>         contentHeaderBody.bodySize = size;   // in bytes
> -        IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
> +        IncomingMessage message = new IncomingMessage(publish);
>         message.setContentHeaderBody(contentHeaderBody);
>
>         return message;
> @@ -305,16 +314,19 @@
>     }
>
>
> -    private void sendMessages(long messageCount, final long size) throws AMQException
> +    private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
>     {
>         IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
> +        MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
>         for (int i = 0; i < messages.length; i++)
>         {
>             messages[i] = message(false, size);
>             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
>             qs.add(_queue);
> +            metaData[i] = messages[i].headersReceived();
> +            messages[i].setStoredMessage(_messageStore.addMessage(metaData[i]));
> +
>             messages[i].enqueue(qs);
> -            messages[i].routingComplete(_messageStore, new MessageHandleFactory());
>
>         }
>
> @@ -324,6 +336,10 @@
>
>                 ByteBuffer _data = ByteBuffer.allocate((int)size);
>
> +                {
> +                    _data.limit((int)size);
> +                }
> +
>                 public int getSize()
>                 {
>                     return (int) size;
> @@ -336,10 +352,12 @@
>
>                 public void reduceToFit()
>                 {
> -
> +
>                 }
>             });
> -            messages[i].deliverToQueues();
> +
> +            _queue.enqueue(new AMQMessage(messages[i].getStoredMessage()));
> +
>         }
>     }
>
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>



-- 
Martin Ritchie

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org