You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC

svn commit: r827724 [7/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apach...

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Oct 20 16:23:01 2009
@@ -21,14 +21,16 @@
 package org.apache.qpid.server.virtualhost;
 
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -45,16 +47,15 @@
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 
 import javax.management.NotCompliantMBeanException;
 import java.util.Collections;
@@ -63,9 +64,9 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
-public class VirtualHost implements Accessable
+public class VirtualHostImpl implements Accessable, VirtualHost
 {
-    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+    private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
 
     private final String _name;
 
@@ -133,9 +134,9 @@
             return _name.toString();
         }
 
-        public VirtualHost getVirtualHost()
+        public VirtualHostImpl getVirtualHost()
         {
-            return VirtualHost.this;
+            return VirtualHostImpl.this;
         }
 
     } // End of MBean class
@@ -147,17 +148,17 @@
      *
      * @throws Exception
      */
-    public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
+    public VirtualHostImpl(VirtualHostConfiguration hostConfig) throws Exception
     {
         this(hostConfig, null);
     }
 
-    public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+    public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
     {
         _configuration = hostConfig;
         _name = hostConfig.getName();
 
-        CurrentActor.get().message(VirtualHostMessages.VHT_1001(_name));        
+        CurrentActor.get().message(VirtualHostMessages.VHT_1001(_name));
 
         if (_name == null || _name.length() == 0)
         {
@@ -196,7 +197,7 @@
         // Derby being one.
         // todo this can be removed with the resolution fo QPID-2096
         configFileRT.exchange.clear();
-        
+
         initialiseModel(hostConfig);
 
         //todo REMOVE Work Around for QPID-2096
@@ -279,7 +280,7 @@
             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
                                                    period / 2,
                                                    period);
-            
+
             class ForceChannelClosuresTask extends TimerTask
             {
                 public void run()
@@ -289,7 +290,7 @@
             }
         }
     }
-    
+
     private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
     {
         String messageStoreClass = hostConfig.getMessageStoreClass();
@@ -303,7 +304,24 @@
                                          " does not.");
         }
         MessageStore messageStore = (MessageStore) o;
-        messageStore.configure(this, "store", hostConfig);
+        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+
+        MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
+
+        messageStore.configureConfigStore(this.getName(),
+                                          recoveryHandler,
+                                          hostConfig.getStoreConfiguration(),
+                                          storeLogSubject);
+
+        messageStore.configureMessageStore(this.getName(),
+                                           recoveryHandler,
+                                           hostConfig.getStoreConfiguration(),
+                                           storeLogSubject);
+        messageStore.configureTransactionLog(this.getName(),
+                                           recoveryHandler,
+                                           hostConfig.getStoreConfiguration(),
+                                           storeLogSubject);
+
         _messageStore = messageStore;
         _durableConfigurationStore = messageStore;
     }
@@ -450,13 +468,13 @@
             {
                 queue.stop();
             }
-        }        
+        }
 
         //Stop Housekeeping
         if (_houseKeepingTimer != null)
         {
             _houseKeepingTimer.cancel();
-        }        
+        }
 
         //Close MessageStore
         if (_messageStore != null)
@@ -503,6 +521,14 @@
             //To change body of implemented methods use File | Settings | File Templates.
         }
 
+        public void configureConfigStore(String name,
+                                         ConfigurationRecoveryHandler recoveryHandler,
+                                         Configuration config,
+                                         LogSubject logSubject) throws Exception
+        {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+
         public void createExchange(Exchange exchange) throws AMQException
         {
             if (exchange.isDurable())

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Tue Oct 20 16:23:01 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.virtualhost;
 
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,7 +30,7 @@
 
 public class VirtualHostRegistry
 {
-    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
 
 
     private String _defaultVirtualHostName;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Tue Oct 20 16:23:01 2009
@@ -14,14 +14,16 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 
 public class Copy extends Move
 {
@@ -49,7 +51,9 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
     {
-        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), _storeContext);
+        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), txn);
+        txn.commit();
     }
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Tue Oct 20 16:23:01 2009
@@ -21,9 +21,6 @@
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.commons.codec.binary.Hex;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.message.ServerMessage;
@@ -31,7 +28,6 @@
 import org.apache.qpid.tools.utils.Console;
 
 import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -137,114 +133,116 @@
             hex.add(Console.ROW_DIVIDER);
             ascii.add(Console.ROW_DIVIDER);
 
-            if(msg instanceof AMQMessage)
+
+            final int messageSize = (int) msg.getSize();
+            if (messageSize != 0)
             {
+                hex.add("Hex");
+                hex.add(Console.ROW_DIVIDER);
 
-                Iterator bodies = ((AMQMessage)msg).getContentBodyIterator();
-                if (bodies.hasNext())
-                {
 
-                    hex.add("Hex");
-                    hex.add(Console.ROW_DIVIDER);
+                ascii.add("ASCII");
+                ascii.add(Console.ROW_DIVIDER);
 
+                java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024);
 
-                    ascii.add("ASCII");
-                    ascii.add(Console.ROW_DIVIDER);
+                int position = 0;
 
-                    while (bodies.hasNext())
-                    {
-                        ContentChunk chunk = (ContentChunk) bodies.next();
+                while(position < messageSize)
+                {
+
+                    position += msg.getContent(buf, position);
+                    buf.flip();
+                    //Duplicate so we don't destroy original data :)
+                    java.nio.ByteBuffer hexBuffer = buf;
 
-                        //Duplicate so we don't destroy original data :)
-                        ByteBuffer hexBuffer = chunk.getData().duplicate();
+                    java.nio.ByteBuffer charBuffer = hexBuffer.duplicate();
 
-                        ByteBuffer charBuffer = hexBuffer.duplicate();
+                    Hex hexencoder = new Hex();
 
-                        Hex hexencoder = new Hex();
+                    while (hexBuffer.hasRemaining())
+                    {
+                        byte[] line = new byte[LINE_SIZE];
 
-                        while (hexBuffer.hasRemaining())
+                        int bufsize = hexBuffer.remaining();
+                        if (bufsize < LINE_SIZE)
+                        {
+                            hexBuffer.get(line, 0, bufsize);
+                        }
+                        else
                         {
-                            byte[] line = new byte[LINE_SIZE];
+                            bufsize = line.length;
+                            hexBuffer.get(line);
+                        }
 
-                            int bufsize = hexBuffer.remaining();
-                            if (bufsize < LINE_SIZE)
-                            {
-                                hexBuffer.get(line, 0, bufsize);
-                            }
-                            else
-                            {
-                                bufsize = line.length;
-                                hexBuffer.get(line);
-                            }
+                        byte[] encoded = hexencoder.encode(line);
 
-                            byte[] encoded = hexencoder.encode(line);
+                        try
+                        {
+                            String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+                            String hexLine = "";
 
-                            try
+                            int strKength = encStr.length();
+                            for (int c = 0; c < strKength; c++)
                             {
-                                String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
-                                String hexLine = "";
+                                hexLine += encStr.charAt(c);
 
-                                int strKength = encStr.length();
-                                for (int c = 0; c < strKength; c++)
+                                if (c % 2 == 1 && SPACE_BYTES)
                                 {
-                                    hexLine += encStr.charAt(c);
-
-                                    if (c % 2 == 1 && SPACE_BYTES)
-                                    {
-                                        hexLine += BYTE_SPACER;
-                                    }
+                                    hexLine += BYTE_SPACER;
                                 }
-
-                                hex.add(hexLine);
                             }
-                            catch (UnsupportedEncodingException e)
-                            {
-                                _console.println(e.getMessage());
-                                return null;
-                            }
-                        }
 
-                        while (charBuffer.hasRemaining())
+                            hex.add(hexLine);
+                        }
+                        catch (UnsupportedEncodingException e)
                         {
-                            String asciiLine = "";
+                            _console.println(e.getMessage());
+                            return null;
+                        }
+                    }
+
+                    while (charBuffer.hasRemaining())
+                    {
+                        String asciiLine = "";
 
-                            for (int pos = 0; pos < LINE_SIZE; pos++)
+                        for (int pos = 0; pos < LINE_SIZE; pos++)
+                        {
+                            if (charBuffer.hasRemaining())
                             {
-                                if (charBuffer.hasRemaining())
-                                {
-                                    byte ch = charBuffer.get();
+                                byte ch = charBuffer.get();
 
-                                    if (isPrintable(ch))
-                                    {
-                                        asciiLine += (char) ch;
-                                    }
-                                    else
-                                    {
-                                        asciiLine += NON_PRINTING_ASCII_CHAR;
-                                    }
-
-                                    if (SPACE_BYTES)
-                                    {
-                                        asciiLine += BYTE_SPACER;
-                                    }
+                                if (isPrintable(ch))
+                                {
+                                    asciiLine += (char) ch;
                                 }
                                 else
                                 {
-                                    break;
+                                    asciiLine += NON_PRINTING_ASCII_CHAR;
                                 }
-                            }
 
-                            ascii.add(asciiLine);
+                                if (SPACE_BYTES)
+                                {
+                                    asciiLine += BYTE_SPACER;
+                                }
+                            }
+                            else
+                            {
+                                break;
+                            }
                         }
+
+                        ascii.add(asciiLine);
                     }
+                    buf.clear();
                 }
-                else
-                {
-                    List<String> result = new LinkedList<String>();
+            }
+            else
+            {
+                List<String> result = new LinkedList<String>();
 
-                    display.add(result);
-                    result.add("No ContentBodies");
-                }
+                display.add(result);
+                result.add("No ContentBodies");
             }
         }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Tue Oct 20 16:23:01 2009
@@ -14,17 +14,17 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 
 import java.util.LinkedList;
@@ -33,12 +33,6 @@
 public class Move extends AbstractCommand
 {
 
-    /**
-     * Since the Coopy command is not associated with a real channel we can safely create our own store context
-     * for use in the few methods that require one.
-     */
-    protected StoreContext _storeContext = new StoreContext();
-
     public Move(MessageStoreTool tool)
     {
         super(tool);
@@ -201,6 +195,8 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
     {
-        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
+        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), txn);
+        txn.commit();
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,7 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.message.ServerMessage;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class AMQBrokerManagerMBeanTest extends TestCase
@@ -46,7 +47,7 @@
         assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
 
 
-        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
         mbean.createNewExchange(exchange1, "direct", false);
         mbean.createNewExchange(exchange2, "topic", false);
         mbean.createNewExchange(exchange3, "headers", false);
@@ -68,7 +69,7 @@
     {
         String queueName = "testQueue_" + System.currentTimeMillis();
 
-        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
 
         assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Tue Oct 20 16:23:01 2009
@@ -27,7 +27,7 @@
 import org.apache.qpid.server.queue.MockAMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryIterator;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.MockSubscription;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Tue Oct 20 16:23:01 2009
@@ -14,21 +14,20 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
- * 
+ *  under the License.
+ *
  */
 package org.apache.qpid.server.configuration;
 
 
 import junit.framework.TestCase;
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class VirtualHostConfigurationTest extends TestCase
 {
@@ -55,50 +54,50 @@
 
         super.tearDown();
     }
-    
+
     public void testQueuePriority() throws Exception
     {
         // Set up queue with 5 priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "atest");
-        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", 
+        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
                               "5");
 
         // Set up queue with JMS style priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "ptest");
-        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", 
+        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
                                "true");
-        
+
         // Set up queue with no priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "ntest");
-        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", 
+        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
                               "false");
-        
-        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-        
+
+        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
         // Check that atest was a priority queue with 5 priorities
         AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
         assertTrue(atest instanceof AMQPriorityQueue);
         assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
-        
+
         // Check that ptest was a priority queue with 10 priorities
         AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
         assertTrue(ptest instanceof AMQPriorityQueue);
         assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
-        
+
         // Check that ntest wasn't a priority queue
         AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
         assertFalse(ntest instanceof AMQPriorityQueue);
     }
-    
+
     public void testQueueAlerts() throws Exception
     {
         // Set up queue with 5 priorities
@@ -106,7 +105,7 @@
         configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1");
         configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2");
         configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3");
-        
+
         configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest");
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct");
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4");
@@ -114,21 +113,21 @@
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6");
 
         configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest");
-        
-        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-        
+
+        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
         // Check specifically configured values
         AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
         assertEquals(4, aTest.getMaximumQueueDepth());
         assertEquals(5, aTest.getMaximumMessageSize());
         assertEquals(6, aTest.getMaximumMessageAge());
-        
-        // Check default values        
+
+        // Check default values
         AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
         assertEquals(1, bTest.getMaximumQueueDepth());
         assertEquals(2, bTest.getMaximumMessageSize());
         assertEquals(3, bTest.getMaximumMessageAge());
-        
+
     }
-    
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Oct 20 16:23:01 2009
@@ -27,15 +27,18 @@
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.log4j.Logger;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AbstractHeadersExchangeTestBase extends TestCase
 {
@@ -49,8 +52,6 @@
      */
     private MessageStore _store = new MemoryMessageStore();
 
-    private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
     private int count;
 
     public void testDoNothing()
@@ -88,8 +89,8 @@
 
     protected int route(Message m) throws AMQException
     {
+        m.getIncomingMessage().headersReceived();
         m.route(exchange);
-        m.getIncomingMessage().routingComplete(_store, _handleFactory);
         if(m.getIncomingMessage().allContentReceived())
         {
             for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
@@ -343,7 +344,7 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void setRedelivered(boolean b)
+                public void setRedelivered()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -452,6 +453,8 @@
      */
     static class Message extends AMQMessage
     {
+        private static AtomicLong _messageId = new AtomicLong();
+
         private class TestIncomingMessage extends IncomingMessage
         {
 
@@ -459,7 +462,7 @@
                                        final MessagePublishInfo info,
                                        final AMQProtocolSession publisher)
             {
-                super(messageId, info, publisher);
+                super(info);
             }
 
 
@@ -484,7 +487,6 @@
 
         private IncomingMessage _incoming;
 
-        private static MessageStore _messageStore = new SkeletonMessageStore();
 
         Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
         {
@@ -493,7 +495,7 @@
 
         Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
         {
-            this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+            this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
         }
 
         public IncomingMessage getIncomingMessage()
@@ -506,32 +508,27 @@
                         ContentHeaderBody header,
                         List<ContentBody> bodies) throws AMQException
         {
-            super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
+            super(new MockStoredMessage(messageId, publish, header));
+
+            StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
 
+            int pos = 0;
+            for(ContentBody body : bodies)
+            {
+                storedMessage.addContent(pos, body.payload.duplicate().buf());
+                pos += body.payload.limit();
+            }
 
-            
             _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
             _incoming.setContentHeaderBody(header);
 
 
         }
 
-        private static AMQMessageHandle createMessageHandle(final long messageId,
-                                                            final MessagePublishInfo publish,
-                                                            final ContentHeaderBody header)
-        {
-
-            final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
-                                                                                                       _messageStore,
-                                                                                                       true);
-
-            amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
-            return amqMessageHandle;
-        }
 
         private Message(AMQMessage msg) throws AMQException
         {
-            super(msg);
+            super(msg.getStoredMessage());
         }
 
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Oct 20 16:23:01 2009
@@ -53,6 +53,16 @@
             return null;
         }
 
+        public String getMimeType()
+        {
+            return null;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        public String getEncoding()
+        {
+            return null;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
         public byte getPriority()
         {
             return 0;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,8 @@
         super.setUp();
         // AR will use the NullAR by default
         // Just use the first vhost.
-        VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+        VirtualHost
+                virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
         _protocolSession = new InternalTestProtocolSession(virtualHost);
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.exchange;
 
@@ -28,9 +28,12 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 
 public class TopicExchangeTest extends TestCase
@@ -54,19 +57,19 @@
 
     public void tearDown()
     {
-        ApplicationRegistry.remove(); 
+        ApplicationRegistry.remove();
     }
 
 
     public void testNoRoute() throws AMQException
     {
-        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);        
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
         _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
 
 
         MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
 
-        IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
+        IncomingMessage message = new IncomingMessage(info);
 
         message.enqueue(_exchange.route(message));
 
@@ -349,9 +352,11 @@
     private int routeMessage(final IncomingMessage message)
             throws AMQException
     {
+        MessageMetaData mmd = message.headersReceived();
+        message.setStoredMessage(_store.addMessage(mmd));
+
         message.enqueue(_exchange.route(message));
-        message.routingComplete(_store, new MessageHandleFactory());
-        AMQMessage msg = new AMQMessage(message.getMessageHandle(), message.getContentHeader(), message.getSize(), message.getMessagePublishInfo());
+        AMQMessage msg = new AMQMessage(message.getStoredMessage());
         for(AMQQueue q : message.getDestinationQueues())
         {
             q.enqueue(msg);
@@ -393,8 +398,11 @@
     {
         MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
 
-        IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
-        message.setContentHeaderBody( new ContentHeaderBody());
+        IncomingMessage message = new IncomingMessage(info);
+        final ContentHeaderBody chb = new ContentHeaderBody();
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        chb.properties = props;
+        message.setContentHeaderBody(chb);
 
 
         return message;
@@ -417,7 +425,7 @@
 
         public void setExchange(AMQShortString exchange)
         {
-                        
+
         }
 
         public boolean isImmediate()

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java Tue Oct 20 16:23:01 2009
@@ -119,7 +119,7 @@
         // Verify that the message has the correct type
         assertTrue("Message contains the [con: prefix",
                    logs.get(0).toString().contains("[con:"));
-        
+
 
         // Verify that all the values were presented to the MessageFormatter
         // so we will not end up with '{n}' entries in the log.

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java Tue Oct 20 16:23:01 2009
@@ -73,7 +73,7 @@
         // Correctly Close the AR we created
         ApplicationRegistry.remove();
 
-        super.tearDown();        
+        super.tearDown();
     }
 
     private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java Tue Oct 20 16:23:01 2009
@@ -59,7 +59,7 @@
         validateLogMessage(log, "MST-1003", expected);
     }
 
-    public void testMessage1004()
+  /*  public void testMessage1004()
     {
         _logMessage = MessageStoreMessages.MST_1004(null,false);
         List<Object> log = performLog();
@@ -91,7 +91,7 @@
 
         // Here we use MessageFormat to ensure the messasgeCount of 2000 is
         // reformated for display as '2,000'
-        String[] expected = {"Recovered ", 
+        String[] expected = {"Recovered ",
                              MessageFormat.format("{0,number}", messasgeCount),
                              "messages for queue", queueName};
 
@@ -119,5 +119,5 @@
 
         validateLogMessage(log, "MST-1006", expected);
     }
-
+    */
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -23,19 +23,16 @@
 import junit.framework.TestCase;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
 
 import javax.management.JMException;
-import java.security.Principal;
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class AMQProtocolSessionMBeanTest extends TestCase

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Tue Oct 20 16:23:01 2009
@@ -27,13 +27,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.transport.TestNetworkDriver;
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -68,7 +68,17 @@
     public byte getProtocolMajorVersion()
     {
         return (byte) 8;
-    }    
+    }
+
+    public void writeReturn(MessagePublishInfo messagePublishInfo,
+                            ContentHeaderBody header,
+                            MessageContentSource msgContent,
+                            int channelId,
+                            int replyCode,
+                            AMQShortString replyText) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 
     public byte getProtocolMinorVersion()
     {
@@ -82,12 +92,12 @@
         synchronized (_channelDelivers)
         {
             List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
-            
+
             if (all == null)
             {
                 return new ArrayList<DeliveryPair>(0);
             }
-            
+
             List<DeliveryPair> msgs = all.subList(0, count);
 
             List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
@@ -99,15 +109,6 @@
         }
     }
 
-    public void writeReturn(MessagePublishInfo messagePublishInfo,
-                            ContentHeaderBody header,
-                            Iterator<AMQDataBlock> bodyFrameIterator,
-                            int channelId,
-                            int replyCode,
-                            AMQShortString replyText) throws AMQException
-    {
-
-    }
     // *** ProtocolOutputConverter Implementation
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Tue Oct 20 16:23:01 2009
@@ -22,17 +22,10 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-
-import java.security.Principal;
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends TestCase
@@ -66,14 +59,14 @@
         }
         assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
     }
-    
+
     @Override
     public void setUp()
     {
         //Highlight that this test will cause a new AR to be created
         ApplicationRegistry.getInstance();
     }
-    
+
     @Override
     public void tearDown() throws Exception
     {
@@ -87,7 +80,7 @@
         {
             // Correctly Close the AR we created
             ApplicationRegistry.remove();
-        }        
+        }
     }
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.queue;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,21 +8,22 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import junit.framework.AssertionFailedError;
@@ -45,21 +46,21 @@
         _queue.enqueue(createMessage(1L, (byte) 10));
         _queue.enqueue(createMessage(2L, (byte) 4));
         _queue.enqueue(createMessage(3L, (byte) 0));
-        
+
         // Enqueue messages in reverse order
         _queue.enqueue(createMessage(4L, (byte) 0));
         _queue.enqueue(createMessage(5L, (byte) 4));
         _queue.enqueue(createMessage(6L, (byte) 10));
-        
+
         // Enqueue messages out of order
         _queue.enqueue(createMessage(7L, (byte) 4));
         _queue.enqueue(createMessage(8L, (byte) 10));
         _queue.enqueue(createMessage(9L, (byte) 0));
-        
+
         // Register subscriber
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
-        
+
         ArrayList<QueueEntry> msgs = _subscription.getMessages();
         try
         {
@@ -98,10 +99,10 @@
         msg.getContentHeaderBody().properties = props;
         return msg;
     }
-    
+
     protected AMQMessage createMessage(Long id) throws AMQException
     {
         return createMessage(id, (byte) 0);
     }
-    
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Oct 20 16:23:01 2009
@@ -25,9 +25,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -44,7 +47,7 @@
 
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
-{                                                         
+{
     private final static long MAX_MESSAGE_COUNT = 50;
     private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
     private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
@@ -232,7 +235,7 @@
 
         _queue.registerSubscription(
                 subscription2, false);
-        
+
         while (_queue.getUndeliveredMessageCount()!= 0)
         {
             Thread.sleep(100);
@@ -251,7 +254,7 @@
         _queueMBean.clearQueue();
         assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
     }
-    
+
     protected IncomingMessage message(final boolean immediate, long size) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
@@ -284,8 +287,10 @@
         };
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        contentHeaderBody.properties = props;
         contentHeaderBody.bodySize = size;   // in bytes
-        IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
+        IncomingMessage message = new IncomingMessage(publish);
         message.setContentHeaderBody(contentHeaderBody);
 
         return message;
@@ -312,13 +317,16 @@
     private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
     {
         IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
+        MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false, size);
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
+            metaData[i] = messages[i].headersReceived();
+            messages[i].setStoredMessage(_messageStore.addMessage(metaData[i]));
+
             messages[i].enqueue(qs);
-            messages[i].routingComplete(_messageStore, new MessageHandleFactory());
 
         }
 
@@ -328,6 +336,10 @@
 
                 ByteBuffer _data = ByteBuffer.allocate((int)size);
 
+                {
+                    _data.limit((int)size);
+                }
+
                 public int getSize()
                 {
                     return (int) size;
@@ -340,13 +352,11 @@
 
                 public void reduceToFit()
                 {
-                    
+
                 }
             });
-            _queue.enqueue(new AMQMessage(messages[i].getMessageHandle(),
-                                          messages[i].getContentHeader(),
-                                          messages[i].getSize(),
-                                          messages[i].getMessagePublishInfo()));
+
+            _queue.enqueue(new AMQMessage(messages[i].getStoredMessage()));
 
         }
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
 
 public class AMQQueueFactoryTest extends TestCase
 {
@@ -55,31 +54,18 @@
         FieldTable fieldTable = new FieldTable();
         fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
 
-        try
-        {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, fieldTable);
-
-            assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());            
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
+                                           _virtualHost, fieldTable);
+
+        assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
     }
 
 
     public void testSimpleQueueRegistration()
     {
-        try
-        {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, null);
-            assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+                                           _virtualHost, null);
+        assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -29,6 +29,10 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -41,6 +45,7 @@
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
 
 import javax.management.JMException;
 
@@ -151,13 +156,11 @@
     private void verifyBrokerState()
     {
 
-        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
 
         // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
-        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
-        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
-        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+        assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
     }
 
     public void testConsumerCount() throws AMQException
@@ -266,16 +269,22 @@
         }
 
         IncomingMessage msg = message(false, false);
-        long id = msg.getMessageNumber();
         _queue.clearQueue();
         ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
         qs.add(_queue);
         msg.enqueue(qs);
-        msg.routingComplete(_messageStore, new MessageHandleFactory());
+        MessageMetaData mmd = msg.headersReceived();
+        msg.setStoredMessage(_messageStore.addMessage(mmd));
+        long id = msg.getMessageNumber();
+
         msg.addContentBodyFrame(new ContentChunk()
         {
             ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
 
+            {
+                _data.limit((int)MESSAGE_SIZE);
+            }
+
             public int getSize()
             {
                 return (int) MESSAGE_SIZE;
@@ -292,7 +301,7 @@
             }
         });
 
-        AMQMessage m = new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo());
+        AMQMessage m = new AMQMessage(msg.getStoredMessage());
         for(AMQQueue q : msg.getDestinationQueues())
         {
             q.enqueue(m);
@@ -345,7 +354,7 @@
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
-        IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
+        IncomingMessage msg = new IncomingMessage(publish);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;
 
@@ -355,7 +364,14 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+        IApplicationRegistry  applicationRegistry  = new TestApplicationRegistry(new ServerConfiguration(configuration));
+        ApplicationRegistry.initialise(applicationRegistry );
+
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
         _messageStore = _virtualHost.getMessageStore();
 
@@ -381,7 +397,8 @@
             currentMessage.enqueue(qs);
 
             // route header
-            currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+            MessageMetaData mmd = currentMessage.headersReceived();
+            currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
 
             // Add the body so we have somthing to test later
             currentMessage.addContentBodyFrame(
@@ -391,7 +408,7 @@
                                                        new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
                                                                        MESSAGE_SIZE)));
 
-            AMQMessage m = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize(), currentMessage.getMessagePublishInfo());
+            AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
             for(AMQQueue q : currentMessage.getDestinationQueues())
             {
                 q.enqueue(m);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Tue Oct 20 16:23:01 2009
@@ -28,7 +28,9 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -93,7 +95,6 @@
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
         _queue.registerSubscription(_subscription,false);
-        MessageHandleFactory factory = new MessageHandleFactory();
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -126,37 +127,37 @@
                     return new AMQShortString("rk");
                 }
             };
-            final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession);
+            final IncomingMessage msg = new IncomingMessage(publishBody);
             //IncomingMessage msg2 = null;
+            BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+            ContentHeaderBody cb = new ContentHeaderBody();
+            cb.properties = b;
+
             if (persistent)
             {
-                BasicContentHeaderProperties b = new BasicContentHeaderProperties();
                 //This is DeliveryMode.PERSISTENT
                 b.setDeliveryMode((byte) 2);
-                ContentHeaderBody cb = new ContentHeaderBody();
-                cb.properties = b;
-                msg.setContentHeaderBody(cb);
-            }
-            else
-            {
-                msg.setContentHeaderBody(new ContentHeaderBody());
             }
+
+            msg.setContentHeaderBody(cb);
+
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             msg.enqueue(qs);
-            msg.routingComplete(_messageStore, factory);
+            MessageMetaData mmd = msg.headersReceived();
+            msg.setStoredMessage(_messageStore.addMessage(mmd));
             if(msg.allContentReceived())
             {
-                Transaction txn = new AutoCommitTransaction(_messageStore);
-                txn.enqueue(_queue, msg, new Transaction.Action() {
+                ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+                txn.enqueue(_queue, msg, new ServerTransaction.Action() {
                     public void postCommit()
                     {
                         try
                         {
-                            _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo()));
+                            _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
                         }
                         catch (AMQException e)
                         {
@@ -213,8 +214,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertTrue(_messageStore.getMessageCount() == 0);
+
 
     }
 
@@ -230,8 +231,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertTrue(_messageStore.getMessageCount() == 0);
+
 
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Tue Oct 20 16:23:01 2009
@@ -21,22 +21,17 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
 
 public class MockAMQMessage extends AMQMessage
 {
     public MockAMQMessage(long messageId)
             throws AMQException
     {
-       super(new MockAMQMessageHandle(messageId) ,
-             (MessagePublishInfo)new MockMessagePublishInfo());
+       super(new MockStoredMessage(messageId));
     }
 
-    protected MockAMQMessage(AMQMessage msg)
-            throws AMQException
-    {
-        super(msg);
-    }
+
 
 
     @Override
@@ -44,4 +39,5 @@
     {
         return 0l;
     }
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Oct 20 16:23:01 2009
@@ -25,12 +25,12 @@
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.AMQException;
 
 import java.util.List;
@@ -57,6 +57,11 @@
         return _name;
     }
 
+    public void setNoLocal(boolean b)
+    {
+        
+    }
+
     public boolean isDurable()
     {
         return false;  //To change body of implemented methods use File | Settings | File Templates.
@@ -216,18 +221,18 @@
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
-    
+
     public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
     {
         return null;
     }
 
-    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -352,7 +357,7 @@
     }
 
     public void checkCapacity(AMQChannel channel)
-    {               
+    {
     }
 
     public ManagedObject getManagedObject()
@@ -367,7 +372,7 @@
 
     public void setMinimumAlertRepeatGap(long value)
     {
-        
+
     }
 
     public long getCapacity()
@@ -392,7 +397,7 @@
 
     public void configure(QueueConfiguration config)
     {
-        
+
     }
 
     public PrincipalHolder getPrincipalHolder()
@@ -416,4 +421,8 @@
     }
 
 
+    public String getResourceName()
+    {
+        return _name.toString();
+    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Tue Oct 20 16:23:01 2009
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
 
 public class MockQueueEntry implements QueueEntry
 {
@@ -71,7 +72,7 @@
 
     public void routeToAlternate()
     {
-        
+
     }
 
     public void dispose()
@@ -124,35 +125,35 @@
         return false;
     }
 
-    
+
     public boolean isQueueDeleted()
     {
 
         return false;
     }
 
-    
+
     public boolean isRejectedBy(Subscription subscription)
     {
 
         return false;
     }
 
-    
+
     public void reject()
     {
 
 
     }
 
-    
+
     public void reject(Subscription subscription)
     {
 
 
     }
 
-    
+
     public void release()
     {
 
@@ -171,7 +172,7 @@
         return false;
     }
 
-    
+
     public void requeue()
     {
 
@@ -190,8 +191,8 @@
 
     }
 
-    
-    public void setRedelivered(boolean b)
+
+    public void setRedelivered()
     {
 
 
@@ -209,7 +210,7 @@
 
     public boolean isRedelivered()
     {
-        return false;  
+        return false;
     }
 
 

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,92 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.nio.ByteBuffer;
+
+public class MockStoredMessage implements StoredMessage<MessageMetaData>
+{
+    private long _messageId;
+    private MessageMetaData _metaData;
+    private final ByteBuffer _content;
+
+
+    public MockStoredMessage(long messageId)
+    {
+        this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60));
+    }
+
+    public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
+    {
+        _messageId = messageId;
+        _metaData = new MessageMetaData(info, chb, 0);
+        _content = ByteBuffer.allocate(_metaData.getContentSize());
+
+    }
+
+    public MessageMetaData getMetaData()
+    {
+        return _metaData;
+    }
+
+    public long getMessageNumber()
+    {
+        return _messageId;
+    }
+
+    public void addContent(int offsetInMessage, ByteBuffer src)
+    {
+        src = src.duplicate();
+        ByteBuffer dst = _content.duplicate();
+        dst.position(offsetInMessage);
+        dst.put(src);
+    }
+
+    public int getContent(int offset, ByteBuffer dst)
+    {
+        ByteBuffer src = _content.duplicate();
+        src.position(offset);
+        src = src.slice();
+        if(dst.remaining() < src.limit())
+        {
+            src.limit(dst.remaining());
+        }
+        dst.put(src);
+        return src.limit();
+    }
+
+    public TransactionLog.StoreFuture flushToStore()
+    {
+        return MessageStore.IMMEDIATE_FUTURE;
+    }
+
+    public void remove()
+    {
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org