You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC

svn commit: r829675 [5/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -37,12 +37,18 @@
 import java.util.List;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
 {
     long getSessionID();
 
     LogActor getLogActor();
 
+    void setMaxFrameSize(long frameMax);
+
+    long getMaxFrameSize();
+
+    boolean isClosing();
+
     public static final class ProtocolSessionIdentifier
     {
         private final Object _sessionIdentifier;
@@ -201,9 +207,6 @@
 
     void setAuthorizedID(Principal authorizedID);
 
-    /** @return a Principal that was used to authorized this session */
-    Principal getAuthorizedID();
-
     public java.net.SocketAddress getRemoteAddress();
 
     public MethodRegistry getMethodRegistry();
@@ -224,8 +227,10 @@
 
     void commitTransactions(AMQChannel channel) throws AMQException;
 
+    void rollbackTransactions(AMQChannel channel) throws AMQException;
+
     List<AMQChannel> getChannels();
 
     void closeIfLingeringClosedChannels();
-    
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Sun Oct 25 22:58:57 2009
@@ -133,7 +133,7 @@
 
     public String getAuthorizedId()
     {
-        return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
+        return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
     }
 
     public String getVersion()
@@ -227,7 +227,7 @@
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
 
-            _protocolSession.commitTransactions(channel);
+            _protocolSession.rollbackTransactions(channel);
         }
         catch (AMQException ex)
         {

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 25 22:58:57 2009
@@ -1,3 +1,5 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757257
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Oct 25 22:58:57 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
 
 public class AMQPriorityQueue extends SimpleAMQQueue
 {
@@ -34,11 +33,19 @@
                                final boolean autoDelete,
                                final VirtualHost virtualHost,
                                int priorities)
-            throws AMQException
     {
         super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
     }
 
+    public AMQPriorityQueue(String queueName,
+                            boolean durable,
+                            String owner,
+                            boolean autoDelete,
+                            VirtualHost virtualHost, int priorities)
+    {
+        this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+    }
+
     public int getPriorities()
     {
         return ((PriorityQueueList) _entries).getPriorities();
@@ -52,16 +59,25 @@
         while(subIter.advance() && !entry.isAcquired())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
-            QueueEntry subnode = subscription.getLastSeenEntry();
-            while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
+            if(!subscription.isClosed())
             {
-                if(subscription.setLastSeenEntry(subnode,entry))
-                {
-                    break;
-                }
-                else
+                QueueContext context = (QueueContext) subscription.getQueueContext();
+                if(context != null)
                 {
-                    subnode = subscription.getLastSeenEntry();
+                    QueueEntry subnode = context._lastSeenEntry;
+                    QueueEntry released = context._releasedEntry;
+                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+                    {
+                        if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+                        {
+                            break;
+                        }
+                        else
+                        {
+                            subnode = context._lastSeenEntry;
+                            released = context._releasedEntry;
+                        }
+                    }
                 }
             }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Oct 25 22:58:57 2009
@@ -20,34 +20,49 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
 
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
-public interface AMQQueue extends Managable, Comparable<AMQQueue>
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
 {
 
+
+    public interface Context
+    {
+        QueueEntry getLastSeenEntry();
+    }
+
     AMQShortString getName();
 
+    void setNoLocal(boolean b);
+
     boolean isDurable();
 
     boolean isAutoDelete();
 
     AMQShortString getOwner();
+    PrincipalHolder getPrincipalHolder();
+    void setPrincipalHolder(PrincipalHolder principalHolder);
+
+    void setExclusiveOwner(Object owner);
+    Object getExclusiveOwner();
 
     VirtualHost getVirtualHost();
 
@@ -89,17 +104,19 @@
     int delete() throws AMQException;
 
 
-    QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+    QueueEntry enqueue(ServerMessage message) throws AMQException;
+
+    void requeue(QueueEntry entry);
 
-    void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+    void requeue(QueueEntryImpl storeContext, Subscription subscription);
 
-    void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+    void dequeue(QueueEntry entry);
 
 
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-    
+
 
     void addQueueDeleteTask(final Task task);
 
@@ -113,11 +130,11 @@
     List<Long> getMessagesOnTheQueue(int num, int offest);
 
     QueueEntry getMessageOnTheQueue(long messageId);
-    
+
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
-     * 
-     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. 
+     *
+     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
      * @param fromPosition
      * @param toPosition
@@ -127,11 +144,11 @@
 
 
     void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        StoreContext storeContext);
+                                                        ServerTransaction transaction);
 
-    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
+    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
 
-    void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
+    void removeMessagesFromQueue(long fromMessageId, long toMessageId);
 
 
 
@@ -171,9 +188,9 @@
 
 
 
-    void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
+    void deleteMessageFromTop();
 
-    long clearQueue(StoreContext storeContext) throws AMQException;
+    long clearQueue();
 
     /**
      * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc.
@@ -191,6 +208,14 @@
 
     void stop();
 
+    boolean isExclusive();
+
+    Exchange getAlternateExchange();
+
+    void setAlternateExchange(Exchange exchange);
+
+    Map<String, Object> getArguments();
+
     void checkCapacity(AMQChannel channel);
 
     /**
@@ -242,6 +267,6 @@
     }
 
     void configure(QueueConfiguration config);
-    
+
     ManagedObject getManagedObject();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Oct 25 22:58:57 2009
@@ -27,7 +27,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.HashMap;
 
 
 public class AMQQueueFactory
@@ -130,7 +129,6 @@
                                               AMQShortString owner,
                                               boolean autoDelete,
                                               VirtualHost virtualHost, final FieldTable arguments)
-            throws AMQException
     {
         final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
 
@@ -189,4 +187,39 @@
         q.configure(config);
         return q;
     }
+
+    public static AMQQueue createAMQQueueImpl(String queueName,
+                                              boolean durable,
+                                              String owner,
+                                              boolean autoDelete,
+                                              VirtualHost virtualHost, Map<String, Object> arguments)
+            throws AMQException
+    {
+        int priorities = 1;
+        if(arguments != null && arguments.containsKey(X_QPID_PRIORITIES))
+        {
+            Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+            if(prioritiesObj instanceof Number)
+            {
+                priorities = ((Number)prioritiesObj).intValue();
+            }
+        }
+
+
+        AMQQueue q = null;
+        if(priorities > 1)
+        {
+            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities);
+        }
+        else
+        {
+            q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost);
+        }
+
+        //Register the new queue
+        virtualHost.getQueueRegistry().registerQueue(q);
+        q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+        return q;
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sun Oct 25 22:58:57 2009
@@ -22,23 +22,21 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 
 import javax.management.JMException;
-import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.Notification;
 import javax.management.OperationsException;
@@ -72,12 +70,6 @@
 
     private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
 
-    /**
-     * Since the MBean is not associated with a real channel we can safely create our own store context
-     * for use in the few methods that require one.
-     */
-    private StoreContext _storeContext = new StoreContext();
-
     private AMQQueue _queue = null;
     private String _queueName = null;
     // OpenMBean data types for viewMessages method
@@ -131,7 +123,7 @@
         _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
         _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
         _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
-        _msgContentType = new CompositeType("Message Content", "AMQ Message Content", 
+        _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
                     VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS,
                     _msgContentAttributeTypes);
 
@@ -141,9 +133,9 @@
         _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
         _msgAttributeTypes[4] = SimpleType.LONG; // For queue position
 
-        _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES, 
+        _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
                                 VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes);
-        _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, 
+        _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
                                                 VIEW_MSGS_TABULAR_UNIQUE_INDEX);
     }
 
@@ -164,7 +156,11 @@
 
     public String getOwner()
     {
-        return String.valueOf(_queue.getOwner());
+        return String.valueOf(_queue.getPrincipalHolder() == null
+                              ? null
+                              : _queue.getPrincipalHolder().getPrincipal() == null
+                                ? null
+                                : _queue.getPrincipalHolder().getPrincipal().getName());
     }
 
     public boolean isAutoDelete()
@@ -246,7 +242,7 @@
     /**
      * Checks if there is any notification to be send to the listeners
      */
-    public void checkForNotification(AMQMessage msg) throws AMQException
+    public void checkForNotification(ServerMessage msg) throws AMQException
     {
 
         final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -296,32 +292,18 @@
      */
     public void deleteMessageFromTop() throws JMException
     {
-        try
-        {
-            _queue.deleteMessageFromTop(_storeContext);
-        }
-        catch (AMQException ex)
-        {
-            throw new MBeanException(ex, ex.toString());
-        }
+        _queue.deleteMessageFromTop();
     }
 
     /**
      * Clears the queue of non-acquired messages
-     * 
+     *
      * @return the number of messages deleted
      * @see AMQQueue#clearQueue
      */
     public Long clearQueue() throws JMException
     {
-        try
-        {
-            return _queue.clearQueue(_storeContext);
-        }
-        catch (AMQException ex)
-        {
-            throw new MBeanException(ex, ex.toString());
-        }
+        return _queue.clearQueue();
     }
 
     /**
@@ -336,49 +318,41 @@
             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
         }
 
-        AMQMessage msg = entry.getMessage();
-        // get message content
-        Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+        ServerMessage serverMsg = entry.getMessage();
+        final int bodySize = (int) serverMsg.getSize();
+
+
         List<Byte> msgContent = new ArrayList<Byte>();
-        while (cBodies.hasNext())
-        {
-            ContentChunk body = cBodies.next();
-            if (body.getSize() != 0)
-            {
-                if (body.getSize() != 0)
-                {
-                    ByteBuffer slice = body.getData().slice();
-                    for (int j = 0; j < slice.limit(); j++)
-                    {
-                        msgContent.add(slice.get());
-                    }
-                }
-            }
-        }
 
-        try
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
+        int position = 0;
+
+        while(position < bodySize)
         {
-            // Create header attributes list
-            CommonContentHeaderProperties headerProperties =
-                (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
-            String mimeType = null, encoding = null;
-            if (headerProperties != null)
+            position += serverMsg.getContent(buf, position);
+            buf.flip();
+            for(int i = 0; i < buf.limit(); i++)
             {
-                AMQShortString mimeTypeShortSting = headerProperties.getContentType();
-                mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
-                encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+                msgContent.add(buf.get(i));
             }
+            buf.clear();
+        }
 
-            Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+        AMQMessageHeader header = serverMsg.getMessageHeader();
 
-            return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
-        }
-        catch (AMQException e)
+        String mimeType = null, encoding = null;
+        if (header != null)
         {
-            JMException jme = new JMException("Error creating header attributes list: " + e);
-            jme.initCause(e);
-            throw jme;
+            mimeType = header.getMimeType();
+
+            encoding = header.getEncoding();
         }
+
+
+        Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+        return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+
     }
 
     /**
@@ -390,8 +364,8 @@
     {
         return viewMessages((long)beginIndex,(long)endIndex);
     }
-    
-    
+
+
     /**
      * Returns the header contents of the messages stored in this queue in tabular form.
      * @param startPosition The queue position of the first message to be viewed
@@ -404,7 +378,7 @@
             throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
                 + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
         }
-        
+
         if ((endPosition - startPosition) > Integer.MAX_VALUE)
         {
             throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
@@ -421,13 +395,22 @@
             for (int i = 0; i < size ; i++)
             {
                 long position = startPosition + i;
-                AMQMessage msg = list.get(i).getMessage();
-                ContentHeaderBody headerBody = msg.getContentHeaderBody();
-                // Create header attributes list
-                String[] headerAttributes = getMessageHeaderProperties(headerBody);
-                Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
-                CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
-                _messageList.put(messageData);
+                final QueueEntry queueEntry = list.get(i);
+                ServerMessage serverMsg = queueEntry.getMessage();
+                if(serverMsg instanceof AMQMessage)
+                {
+                    AMQMessage msg = (AMQMessage) serverMsg;
+                    ContentHeaderBody headerBody = msg.getContentHeaderBody();
+                    // Create header attributes list
+                    String[] headerAttributes = getMessageHeaderProperties(headerBody);
+                    Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+                    CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+                    _messageList.put(messageData);
+                }
+                else
+                {
+                    // TODO 0-10 Message
+                }
             }
         }
         catch (AMQException e)
@@ -484,7 +467,9 @@
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+        txn.commit();
     }
 
     /**
@@ -500,9 +485,9 @@
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        _queue.removeMessagesFromQueue(fromMessageId, toMessageId, _storeContext);
+        _queue.removeMessagesFromQueue(fromMessageId, toMessageId);
     }
-    
+
     /**
      * @see ManagedQueue#copyMessages
      * @param fromMessageId
@@ -517,9 +502,15 @@
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+
+        txn.commit();
+
+
     }
-    
+
     /**
      * returns Notifications sent by this MBean.
      */

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,12 +44,12 @@
         return _virtualHost;
     }
 
-    public void registerQueue(AMQQueue queue) throws AMQException
+    public void registerQueue(AMQQueue queue)
     {
         _queueMap.put(queue.getName(), queue);
     }
 
-    public void unregisterQueue(AMQShortString name) throws AMQException
+    public void unregisterQueue(AMQShortString name)
     {
         _queueMap.remove(name);
     }
@@ -68,4 +68,9 @@
     {
         return _queueMap.values();
     }
+
+    public AMQQueue getQueue(String queue)
+    {
+        return getQueue(new AMQShortString(queue));
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sun Oct 25 22:58:57 2009
@@ -22,12 +22,13 @@
 
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
-public interface Filterable<E extends Exception>
+public interface Filterable
 {
-    ContentHeaderBody getContentHeaderBody() throws E;
+    AMQMessageHeader getMessageHeader();
 
-    boolean isPersistent() throws E;
+    boolean isPersistent();
 
     boolean isRedelivered();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun Oct 25 22:58:57 2009
@@ -25,19 +25,22 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.List;
+import java.nio.ByteBuffer;
 
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
 {
 
     /** Used for debugging purposes. */
@@ -48,12 +51,6 @@
 
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-    private AMQMessageHandle _messageHandle;
-    private final Long _messageId;
-    private final TransactionalContext _txnContext;
-
-    private static final boolean MSG_AUTH = 
-        ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
 
 
     /**
@@ -68,23 +65,27 @@
      */
     private ArrayList<AMQQueue> _destinationQueues;
 
-    private AMQProtocolSession _publisher;
-    private MessageStore _messageStore;
     private long _expiration;
-    
+
     private Exchange _exchange;
 
 
-    public IncomingMessage(final Long messageId,
-                           final MessagePublishInfo info,
-                           final TransactionalContext txnContext,
-                           final AMQProtocolSession publisher)
+    private int _receivedChunkCount = 0;
+    private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
+
+    // we keep both the original meta data object and the store reference to it just in case the
+    // store would otherwise flow it to disk
+
+    private MessageMetaData _messageMetaData;
+
+    private StoredMessage<MessageMetaData> _storedMessageHandle;
+
+
+    public IncomingMessage(
+            final MessagePublishInfo info
+    )
     {
-        _messageId = messageId;
         _messagePublishInfo = info;
-        _txnContext = txnContext;
-        _publisher = publisher;
-
     }
 
     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -123,183 +124,94 @@
 
     }
 
-    public void routingComplete(final MessageStore store,
-                                final MessageHandleFactory factory) throws AMQException
+    public MessageMetaData headersReceived()
     {
-
-        final boolean persistent = isPersistent();
-        _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
-        if (persistent)
-        {
-            _txnContext.beginTranIfNecessary();
-             // enqueuing the messages ensure that if required the destinations are recorded to a
-            // persistent store
-
-            if(_destinationQueues != null)
-            {
-                for (int i = 0; i < _destinationQueues.size(); i++)
-                {
-                    store.enqueueMessage(_txnContext.getStoreContext(),
-                            _destinationQueues.get(i), _messageId);
-                }
-            }
-        }
+        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+        return _messageMetaData;
     }
 
-    public AMQMessage deliverToQueues()
-            throws AMQException
-    {
-
-        // we get a reference to the destination queues now so that we can clear the
-        // transient message data as quickly as possible
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
-        }
-
-        AMQMessage message = null;
-
-        try
-        {
-            // first we allow the handle to know that the message has been fully received. This is useful if it is
-            // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
-                                                          _messagePublishInfo, getContentHeaderBody());
-
-            
-            
-            message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
-
-            message.setExpiration(_expiration);
-            message.setClientIdentifier(_publisher.getSessionIdentifier());
-
-            // we then allow the transactional context to do something with the message content
-            // now that it has all been received, before we attempt delivery
-            _txnContext.messageFullyReceived(isPersistent());
-            
-            AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
-                     ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; 
-            
-            if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
-            {
-                throw new UnauthorizedAccessException("Acccess Refused",message);
-            }
-            
-            if ((_destinationQueues == null) || _destinationQueues.size() == 0)
-            {
-
-                if (isMandatory() || isImmediate())
-                {
-                    throw new NoRouteException("No Route for message", message);
-
-                }
-                else
-                {
-                    _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
-                }
-            }
-            else
-            {
-                int offset;
-                final int queueCount = _destinationQueues.size();
-                message.incrementReference(queueCount);
-                if(queueCount == 1)
-                {
-                    offset = 0;
-                }
-                else
-                {
-                    offset = ((int)(message.getMessageId().longValue())) % queueCount;
-                    if(offset < 0)
-                    {
-                        offset = -offset;
-                    }
-                }
-                for (int i = offset; i < queueCount; i++)
-                {
-                    // normal deliver so add this message at the end.
-                    _txnContext.deliver(_destinationQueues.get(i), message);
-                }
-                for (int i = 0; i < offset; i++)
-                {
-                    // normal deliver so add this message at the end.
-                    _txnContext.deliver(_destinationQueues.get(i), message);
-                }
-            }
-
-            message.clearStoreContext();
-            return message;
-        }
-        finally
-        {
-            // Remove refence for routing process . Reference count should now == delivered queue count
-            if(message != null) message.decrementReference(_txnContext.getStoreContext());
-        }
 
+    public ArrayList<AMQQueue> getDestinationQueues()
+    {
+        return _destinationQueues;
     }
 
-    public void addContentBodyFrame(final ContentChunk contentChunk)
+    public int addContentBodyFrame(final ContentChunk contentChunk)
             throws AMQException
     {
-
+        _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
         _bodyLengthReceived += contentChunk.getSize();
+        _contentChunks.add(contentChunk);
+
 
-        _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
 
+        return _receivedChunkCount++;
     }
 
     public boolean allContentReceived()
     {
-        return (_bodyLengthReceived == getContentHeaderBody().bodySize);
+        return (_bodyLengthReceived == getContentHeader().bodySize);
     }
 
-    public AMQShortString getExchange() throws AMQException
+    public AMQShortString getExchange()
     {
         return _messagePublishInfo.getExchange();
     }
 
-    public AMQShortString getRoutingKey() throws AMQException
+    public String getRoutingKey()
     {
-        return _messagePublishInfo.getRoutingKey();
+        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
     }
 
-    public boolean isMandatory() throws AMQException
+    public String getBinding()
+    {
+        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
+    }
+
+
+    public boolean isMandatory()
     {
         return _messagePublishInfo.isMandatory();
     }
 
 
-    public boolean isImmediate() throws AMQException
+    public boolean isImmediate()
     {
         return _messagePublishInfo.isImmediate();
     }
 
-    public ContentHeaderBody getContentHeaderBody()
+    public ContentHeaderBody getContentHeader()
     {
         return _contentHeaderBody;
     }
 
 
+    public AMQMessageHeader getMessageHeader()
+    {
+        return _messageMetaData.getMessageHeader();
+    }
+
     public boolean isPersistent()
     {
-        return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 
+        return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
                                                              BasicContentHeaderProperties.PERSISTENT;
     }
-    
+
     public boolean isRedelivered()
     {
         return false;
     }
 
-    public void setMessageStore(final MessageStore messageStore)
+
+    public long getSize()
     {
-        _messageStore = messageStore;
+        return getContentHeader().bodySize;
     }
 
-    public Long getMessageId()
+    public Long getMessageNumber()
     {
-        return _messageId;
+        return _storedMessageHandle.getMessageNumber();
     }
 
     public void setExchange(final Exchange e)
@@ -307,13 +219,82 @@
         _exchange = e;
     }
 
-    public void route() throws AMQException
+    public void route()
     {
-        _exchange.route(this);
+        enqueue(_exchange.route(this));
+
     }
 
     public void enqueue(final ArrayList<AMQQueue> queues)
     {
         _destinationQueues = queues;
     }
+
+    public MessagePublishInfo getMessagePublishInfo()
+    {
+        return _messagePublishInfo;
+    }
+
+    public long getExpiration()
+    {
+        return _expiration;
+    }
+
+    public int getReceivedChunkCount()
+    {
+        return _receivedChunkCount;
+    }
+
+
+    public int getBodyCount() throws AMQException
+    {
+        return _contentChunks.size();
+    }
+
+    public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+    {
+        return _contentChunks.get(index);
+    }
+
+
+    public int getContent(ByteBuffer buf, int offset)
+    {
+        int pos = 0;
+        int written = 0;
+        for(ContentChunk cb : _contentChunks)
+        {
+            ByteBuffer data = cb.getData().buf();
+            if(offset+written >= pos && offset < pos + data.limit())
+            {
+                ByteBuffer src = data.duplicate();
+                src.position(offset+written - pos);
+                src = src.slice();
+
+                if(buf.remaining() < src.limit())
+                {
+                    src.limit(buf.remaining());
+                }
+                int count = src.limit();
+                buf.put(src);
+                written += count;
+                if(buf.remaining() == 0)
+                {
+                    break;
+                }
+            }
+            pos+=data.limit();
+        }
+        return written;
+
+    }
+
+    public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
+    {
+        _storedMessageHandle = storedMessageHandle;
+    }
+
+    public StoredMessage<MessageMetaData> getStoredMessage()
+    {
+        return _storedMessageHandle;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Sun Oct 25 22:58:57 2009
@@ -21,13 +21,14 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
 
 public enum NotificationCheck
 {
 
     MESSAGE_COUNT_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             int msgCount;
             final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@
     },
     MESSAGE_SIZE_ALERT(true)
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             final long maximumMessageSize = queue.getMaximumMessageSize();
             if(maximumMessageSize != 0)
             {
                 // Check for threshold message size
                 long messageSize;
-                try
-                {
-                    messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
-                }
-                catch (AMQException e)
-                {
-                    messageSize = 0;
-                }
+                messageSize = (msg == null) ? 0 : msg.getSize();
 
 
                 if (messageSize >= maximumMessageSize)
                 {
-                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
                     return true;
                 }
             }
@@ -70,7 +64,7 @@
     },
     QUEUE_DEPTH_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             // Check for threshold queue depth in bytes
             final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@
     },
     MESSAGE_AGE_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
 
             final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@
         return _messageSpecific;
     }
 
-    abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+    abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun Oct 25 22:58:57 2009
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
 
 public class PriorityQueueList implements QueueEntryList
 {
@@ -52,26 +53,18 @@
         return _queue;
     }
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(ServerMessage message)
     {
-        try
+        int index = message.getMessageHeader().getPriority() - _priorityOffset;
+        if(index >= _priorities)
         {
-            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
-            if(index >= _priorities)
-            {
-                index = _priorities-1;
-            }
-            else if(index < 0)
-            {
-                index = 0;
-            }
-            return _priorityLists[index].add(message);
+            index = _priorities-1;
         }
-        catch (AMQException e)
+        else if(index < 0)
         {
-            // TODO - fix AMQ Exception
-            throw new RuntimeException(e);
+            index = 0;
         }
+        return _priorityLists[index].add(message);
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Oct 25 22:58:57 2009
@@ -1,8 +1,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -24,18 +24,19 @@
 * under the License.
 *
 */
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends Comparable<QueueEntry>, Filterable
 {
 
 
-
     public static enum State
     {
         AVAILABLE,
         ACQUIRED,
         EXPIRED,
         DEQUEUED,
-        DELETED
+        DELETED;
+
+
     }
 
     public static interface StateChangeListener
@@ -121,6 +122,27 @@
         }
     }
 
+    public final class SubscriptionAssignedState extends EntryState
+    {
+        private final Subscription _subscription;
+
+        public SubscriptionAssignedState(Subscription subscription)
+        {
+            _subscription = subscription;
+        }
+
+
+        public State getState()
+        {
+            return State.AVAILABLE;
+        }
+
+        public Subscription getSubscription()
+        {
+            return _subscription;
+        }
+    }
+
 
     final static EntryState AVAILABLE_STATE = new AvailableState();
     final static EntryState DELETED_STATE = new DeletedState();
@@ -133,7 +155,7 @@
 
     AMQQueue getQueue();
 
-    AMQMessage getMessage();
+    ServerMessage getMessage();
 
     long getSize();
 
@@ -150,16 +172,17 @@
     boolean isDeleted();
 
     boolean acquiredBySubscription();
-
-    void setDeliveredToSubscription();
+    boolean isAcquiredBy(Subscription subscription);
 
     void release();
+    boolean releaseButRetain();
 
-    String debugIdentity();
 
     boolean immediateAndNotDelivered();
 
-    void setRedelivered(boolean b);
+    void setRedelivered();
+
+    boolean isRedelivered();
 
     Subscription getDeliveredSubscription();
 
@@ -169,13 +192,15 @@
 
     boolean isRejectedBy(Subscription subscription);
 
-    void requeue(StoreContext storeContext) throws AMQException;
+    void requeue(Subscription subscription);
+
+    void dequeue();
 
-    void dequeue(final StoreContext storeContext) throws FailedDequeueException;
+    void dispose();
 
-    void dispose(final StoreContext storeContext) throws MessageCleanupException;
+    void discard();
 
-    void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+    void routeToAlternate();
 
     boolean isQueueDeleted();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Oct 25 22:58:57 2009
@@ -21,12 +21,16 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.log4j.Logger;
 
-import java.util.Set;
-import java.util.HashSet;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -42,7 +46,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private final AMQMessage _message;
+    private MessageReference _message;
 
     private Set<Subscription> _rejectedBy = null;
 
@@ -75,6 +79,11 @@
 
     volatile QueueEntryImpl _next;
 
+    private static final int DELIVERED_TO_CONSUMER = 1;
+    private static final int REDELIVERED = 2;
+
+    private volatile int _deliveryState;
+
 
     QueueEntryImpl(SimpleQueueEntryList queueEntryList)
     {
@@ -83,18 +92,19 @@
     }
 
 
-    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
-        _message = message;
+
+        _message = message == null ? null : message.newReference();
 
         _entryIdUpdater.set(this, entryId);
     }
 
-    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
-        _message = message;
+        _message = message == null ? null :  message.newReference();
     }
 
     protected void setEntryId(long entryId)
@@ -112,24 +122,36 @@
         return _queueEntryList.getQueue();
     }
 
-    public AMQMessage getMessage()
+    public ServerMessage getMessage()
     {
-        return _message;
+        return  _message == null ? null : _message.getMessage();
     }
 
     public long getSize()
     {
-        return getMessage().getSize();
+        return getMessage() == null ? 0 : getMessage().getSize();
     }
 
     public boolean getDeliveredToConsumer()
     {
-        return getMessage().getDeliveredToConsumer();
+        return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
     }
 
     public boolean expired() throws AMQException
     {
-        return getMessage().expired(getQueue());
+        ServerMessage message = getMessage();
+        if(message != null)
+        {
+            long expiration = message.getExpiration();
+            if (expiration != 0L)
+            {
+                long now = System.currentTimeMillis();
+
+                return (now > expiration);
+            }
+        }
+        return false;
+
     }
 
     public boolean isAcquired()
@@ -145,6 +167,24 @@
     private boolean acquire(final EntryState state)
     {
         boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+
+        // deal with the case where the node has been assigned to a given subscription already
+        // including the case that the node is assigned to a closed subscription
+        if(!acquired)
+        {
+            if(state != NON_SUBSCRIPTION_ACQUIRED_STATE)
+            {
+                EntryState currentState = _state;
+                if(currentState.getState() == State.AVAILABLE
+                   && ((currentState == AVAILABLE_STATE)
+                       || (((SubscriptionAcquiredState)state).getSubscription() ==
+                           ((SubscriptionAssignedState)currentState).getSubscription())
+                       || ((SubscriptionAssignedState)currentState).getSubscription().isClosed() ))
+                {
+                    acquired = _stateUpdater.compareAndSet(this,currentState, state);
+                }
+            }
+        }
         if(acquired && _stateChangeListeners != null)
         {
             notifyStateChange(State.AVAILABLE, State.ACQUIRED);
@@ -155,7 +195,12 @@
 
     public boolean acquire(Subscription sub)
     {
-        return acquire(sub.getOwningState());
+        final boolean acquired = acquire(sub.getOwningState());
+        if(acquired)
+        {
+            _deliveryState |= DELIVERED_TO_CONSUMER;
+        }
+        return acquired;
     }
 
     public boolean acquiredBySubscription()
@@ -164,38 +209,89 @@
         return (_state instanceof SubscriptionAcquiredState);
     }
 
-    public void setDeliveredToSubscription()
+    public boolean isAcquiredBy(Subscription subscription)
     {
-        getMessage().setDeliveredToConsumer();
+        EntryState state = _state;
+        return state instanceof SubscriptionAcquiredState
+               && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
     }
 
     public void release()
     {
         _stateUpdater.set(this,AVAILABLE_STATE);
-    }
+        if(!getQueue().isDeleted())
+        {
+            getQueue().requeue(this);
+            if(_stateChangeListeners != null)
+            {
+                notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+            }
 
-    public String debugIdentity()
-    {
-        AMQMessage message = getMessage();
-        if (message == null)
+        }
+        else if(acquire())
         {
-            return "null";
+            routeToAlternate();
         }
-        else
+
+
+    }
+
+    public boolean releaseButRetain()
+    {
+        EntryState state = _state;
+
+        boolean stateUpdated = false;
+
+        if(state instanceof SubscriptionAcquiredState)
         {
-            return message.debugIdentity();
+            Subscription sub = ((SubscriptionAcquiredState) state).getSubscription();
+            if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState()))
+            {
+                System.err.println("Message released (and retained)" + getMessage().getMessageNumber());
+                getQueue().requeue(this);
+                if(_stateChangeListeners != null)
+                {
+                    notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+                }
+                stateUpdated = true;
+            }
         }
+
+        return stateUpdated;
+
     }
 
+    public boolean immediateAndNotDelivered()
+    {
+        return !getDeliveredToConsumer() && isImmediate();
+    }
+
+    private boolean isImmediate()
+    {
+        final ServerMessage message = getMessage();
+        return message != null && message.isImmediate();
+    }
 
-    public boolean immediateAndNotDelivered() 
+    public void setRedelivered()
     {
-        return getMessage().immediateAndNotDelivered();
+        _deliveryState |= REDELIVERED;
     }
 
-    public void setRedelivered(boolean b)
+    public AMQMessageHeader getMessageHeader()
     {
-        getMessage().setRedelivered(b);
+        final ServerMessage message = getMessage();
+        return message == null ? null : message.getMessageHeader();
+    }
+
+    public boolean isPersistent()
+    {
+        final ServerMessage message = getMessage();
+        return message != null && message.isPersistent();
+    }
+
+    public boolean isRedelivered()
+    {
+        return (_deliveryState & REDELIVERED) != 0;
     }
 
     public Subscription getDeliveredSubscription()
@@ -230,12 +326,12 @@
         }
         else
         {
-            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+            _log.warn("Requesting rejection by null subscriber:" + this);
         }
     }
 
     public boolean isRejectedBy(Subscription subscription)
-    {        
+    {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
@@ -247,17 +343,16 @@
         }
     }
 
-
-    public void requeue(final StoreContext storeContext) throws AMQException
+    public void requeue(Subscription subscription)
     {
-        getQueue().requeue(storeContext, this);
+        getQueue().requeue(this, subscription);
         if(_stateChangeListeners != null)
         {
             notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
         }
     }
 
-    public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+    public void dequeue()
     {
         EntryState state = _state;
 
@@ -266,10 +361,10 @@
             if (state instanceof SubscriptionAcquiredState)
             {
                 Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
-                s.restoreCredit(this);
+                s.onDequeue(this);
             }
 
-            getQueue().dequeue(storeContext, this);
+            getQueue().dequeue(this);
             if(_stateChangeListeners != null)
             {
                 notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -287,23 +382,74 @@
         }
     }
 
-    public void dispose(final StoreContext storeContext) throws MessageCleanupException
+    public void dispose()
     {
         if(delete())
         {
-            getMessage().decrementReference(storeContext);
+            _message.release();
         }
     }
 
-    public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+    public void discard()
     {
         //if the queue is null then the message is waiting to be acked, but has been removed.
         if (getQueue() != null)
         {
-            dequeue(storeContext);
+            dequeue();
         }
 
-        dispose(storeContext);
+        dispose();
+    }
+
+    public void routeToAlternate()
+    {
+        final AMQQueue currentQueue = getQueue();
+            Exchange alternateExchange = currentQueue.getAlternateExchange();
+
+            if(alternateExchange != null)
+            {
+                final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+                final ServerMessage message = getMessage();
+                if(rerouteQueues != null && rerouteQueues.size() != 0)
+                {
+                    ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+                    txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
+                        public void postCommit()
+                        {
+                            try
+                            {
+                                for(AMQQueue queue : rerouteQueues)
+                                {
+                                    QueueEntry entry = queue.enqueue(message);
+                                }
+                            }
+                            catch (AMQException e)
+                            {
+                                throw new RuntimeException(e);
+                            }
+                        }
+
+                        public void onRollback()
+                        {
+
+                        }
+                    });
+                    txn.dequeue(currentQueue,message,
+                                new ServerTransaction.Action()
+                                {
+                                    public void postCommit()
+                                    {
+                                        discard();
+                                    }
+
+                                    public void onRollback()
+                                    {
+
+                                    }
+                                });
+                }
+            }
     }
 
     public boolean isQueueDeleted()
@@ -379,7 +525,7 @@
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.advanceHead();            
+            _queueEntryList.advanceHead();
             return true;
         }
         else

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun Oct 25 22:58:57 2009
@@ -20,11 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.ServerMessage;
+
 public interface QueueEntryList
 {
     AMQQueue getQueue();
 
-    QueueEntry add(AMQMessage message);
+    QueueEntry add(ServerMessage message);
 
     QueueEntry next(QueueEntry node);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,9 @@
 {
     VirtualHost getVirtualHost();
 
-    void registerQueue(AMQQueue queue) throws AMQException;
+    void registerQueue(AMQQueue queue);
 
-    void unregisterQueue(AMQShortString name) throws AMQException;
+    void unregisterQueue(AMQShortString name);
 
     AMQQueue getQueue(AMQShortString name);
 
@@ -40,4 +40,5 @@
 
     Collection<AMQQueue> getQueues();
 
+    AMQQueue getQueue(String queue);
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org