You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/11/17 22:24:48 UTC

svn commit: r476325 - in /incubator/qpid/branches/new_persistence/java: broker/src/org/apache/qpid/server/ broker/src/org/apache/qpid/server/handler/ broker/src/org/apache/qpid/server/queue/ broker/src/org/apache/qpid/server/store/ broker/test/src/org/...

Author: rgreig
Date: Fri Nov 17 13:24:47 2006
New Revision: 476325

URL: http://svn.apache.org/viewvc?view=rev&rev=476325
Log:
QPID-32 Work-in-progress commit.

Added:
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java   (with props)
Modified:
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
    incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
    incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java Fri Nov 17 13:24:47 2006
@@ -553,7 +553,7 @@
         return _defaultQueue;
     }
 
-    public void processReturns(AMQProtocolSession session)
+    public void processReturns(AMQProtocolSession session) throws AMQException
     {
         for (RequiredDeliveryException bouncedMessage : _returnMessages)
         {

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java Fri Nov 17 13:24:47 2006
@@ -46,12 +46,15 @@
                                AMQMethodEvent<TxCommitBody> evt) throws AMQException
     {
 
-        try{
+        try
+        {
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.commit();
             protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
-            channel.processReturns(protocolSession);            
-        }catch(AMQException e){
+            channel.processReturns(protocolSession);
+        }
+        catch(AMQException e)
+        {
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java Fri Nov 17 13:24:47 2006
@@ -95,7 +95,17 @@
 
         public AMQDataBlock next()
         {
-            return ContentBody.createAMQFrame(_channel, _messageHandle.getContentBody(++_index));
+            try
+            {
+                ContentBody cb = _messageHandle.getContentBody(++_index);
+                return ContentBody.createAMQFrame(_channel, cb);
+            }
+            catch (AMQException e)
+            {
+                // have no choice but to throw a runtime exception
+                throw new RuntimeException("Error getting content body: " + e, e);
+            }
+
         }
 
         public void remove()
@@ -116,7 +126,14 @@
 
         public ContentBody next()
         {
-            return _messageHandle.getContentBody(++_index);
+            try
+            {
+                return _messageHandle.getContentBody(++_index);
+            }
+            catch (AMQException e)
+            {
+                throw new RuntimeException("Error getting content body: " + e, e);
+            }
         }
 
         public void remove()
@@ -199,7 +216,7 @@
         return _publishBody;
     } */
 
-    public ContentHeaderBody getContentHeaderBody()
+    public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
         return _messageHandle.getContentHeaderBody();
     }
@@ -219,14 +236,11 @@
         {
             _txnContext.beginTranIfNecessary();
         }
-        _messageHandle.setPublishBody(_publishBody);
-        _messageHandle.setContentHeaderBody(_contentHeaderBody);
+
         if (_contentHeaderBody.bodySize == 0)
         {
             deliver();
         }
-        _publishBody = null;
-        _contentHeaderBody = null;
     }
 
     public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
@@ -244,7 +258,7 @@
         }
     }
 
-    public boolean isAllContentReceived()
+    public boolean isAllContentReceived() throws AMQException
     {
         return _bodyLengthReceived == _messageHandle.getBodySize();
     }
@@ -401,7 +415,13 @@
 
     private void deliver() throws AMQException
     {
-        // we allow the transactional context to do something with the message content
+        // first we allow the handle to know that the message has been fully received. This is useful if it is
+        // maintaining any calculated values based on content chunks
+        _messageHandle.setPublishAndContentHeaderBody(_publishBody, _contentHeaderBody);
+        _publishBody = null;
+        _contentHeaderBody = null;
+
+        // we then allow the transactional context to do something with the message content
         // now that it has all been received, before we attempt delivery
         _txnContext.messageFullyReceived(isPersistent());
         for (AMQQueue q : _destinationQueues)
@@ -471,6 +491,7 @@
     }
 
     public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+            throws AMQException
     {
         ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
 

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java Fri Nov 17 13:24:47 2006
@@ -18,9 +18,7 @@
  */
 public interface AMQMessageHandle
 {
-    ContentHeaderBody getContentHeaderBody();
-
-    void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException;
+    ContentHeaderBody getContentHeaderBody() throws AMQException;
 
     /**
      * @return the number of body frames associated with this message
@@ -30,7 +28,7 @@
     /**
      * @return the size of the body
      */
-    long getBodySize();
+    long getBodySize() throws AMQException;
 
     /**
      * Get a particular content body
@@ -38,7 +36,7 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentBody getContentBody(int index) throws IllegalArgumentException;
+    ContentBody getContentBody(int index) throws IllegalArgumentException, AMQException;
 
     void addContentBodyFrame(ContentBody contentBody) throws AMQException;
 
@@ -48,6 +46,7 @@
 
     boolean isPersistent() throws AMQException;
 
-    void setPublishBody(BasicPublishBody publishBody)
-            ;
+    void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody)
+            throws AMQException;    
+
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Fri Nov 17 13:24:47 2006
@@ -264,7 +264,7 @@
         }
 
         // Returns the size of messages in the queue
-        public Long getQueueSize()
+        public Long getQueueSize() throws AMQException
         {
             List<AMQMessage> list = _deliveryMgr.getMessages();
             if (list.size() == 0)
@@ -280,7 +280,7 @@
         // Operations
 
         // calculates the size of an AMQMessage
-        private long getMessageSize(AMQMessage msg)
+        private long getMessageSize(AMQMessage msg) throws AMQException
         {
             if (msg == null)
             {
@@ -291,7 +291,7 @@
         }
 
         // Checks if there is any notification to be send to the listeners
-        private void checkForNotification(AMQMessage msg)
+        private void checkForNotification(AMQMessage msg) throws AMQException
         {
             // Check for message count
             Integer msgCount = getMessageCount();
@@ -722,7 +722,7 @@
         return _subscribers;
     }
 
-    protected void updateReceivedMessageCount(AMQMessage msg)
+    protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
     {
         _totalMessagesReceived++;
         _managedObject.checkForNotification(msg);

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Fri Nov 17 13:24:47 2006
@@ -20,6 +20,7 @@
 import org.apache.qpid.server.management.MBeanAttribute;
 import org.apache.qpid.server.management.MBeanOperation;
 import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.AMQException;
 
 import javax.management.JMException;
 import javax.management.MBeanOperationInfo;
@@ -147,7 +148,7 @@
      * @throws IOException
      */
     @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
-    Long getQueueSize() throws IOException;
+    Long getQueueSize() throws IOException, AMQException;
 
     /**
      * Tells the maximum size of all the messages combined together,

Added: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java?view=auto&rev=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java (added)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java Fri Nov 17 13:24:47 2006
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
+ * single unit.
+ */
+public class MessageMetaData
+{
+    private BasicPublishBody _publishBody;
+
+    private ContentHeaderBody _contentHeaderBody;
+
+    private int _contentChunkCount;
+
+    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+    {
+        _contentHeaderBody = contentHeaderBody;
+        _publishBody = publishBody;
+        _contentChunkCount = contentChunkCount;
+    }
+
+    public int getContentChunkCount()
+    {
+        return _contentChunkCount;
+    }
+
+    public void setContentChunkCount(int contentChunkCount)
+    {
+        _contentChunkCount = contentChunkCount;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+    {
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public void setPublishBody(BasicPublishBody publishBody)
+    {
+        _publishBody = publishBody;
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Fri Nov 17 13:24:47 2006
@@ -17,17 +17,18 @@
 
 import java.util.List;
 import java.util.LinkedList;
+import java.lang.ref.WeakReference;
 
 /**
  * @author Robert Greig (robert.j.greig@jpmorgan.com)
  */
 public class WeakReferenceMessageHandle implements AMQMessageHandle
 {
-    private ContentHeaderBody _contentHeaderBody;
+    private WeakReference<ContentHeaderBody> _contentHeaderBody;
 
-    private BasicPublishBody _publishBody;
+    private WeakReference<BasicPublishBody> _publishBody;
 
-    private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+    private List<WeakReference<ContentBody>> _contentBodies = new LinkedList<WeakReference<ContentBody>>();
 
     private boolean _redelivered;
 
@@ -41,16 +42,17 @@
         _messageId = messageId;
     }
 
-    public ContentHeaderBody getContentHeaderBody()
+    public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
-        return _contentHeaderBody;
-    }
-
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
-    {
-        _contentHeaderBody = contentHeaderBody;
-        _messageStore.storePublishBody(_messageId, _publishBody);
-        _messageStore.storeContentHeader(_messageId, contentHeaderBody);
+        ContentHeaderBody chb = _contentHeaderBody.get();
+        if (chb == null)
+        {
+            MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+            chb = mmd.getContentHeaderBody();
+            _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
+            _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+        }
+        return chb;
     }
 
     public int getBodyCount()
@@ -58,30 +60,45 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize()
+    public long getBodySize() throws AMQException
     {
-        return _contentHeaderBody.bodySize;
+        return getContentHeaderBody().bodySize;
     }
 
-    public ContentBody getContentBody(int index) throws IllegalArgumentException
+    public ContentBody getContentBody(int index) throws AMQException, IllegalArgumentException
     {
-        return _contentBodies.get(index);
+        if (index > _contentBodies.size() - 1)
+        {
+            throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+                                               (_contentBodies.size() - 1));
+        }
+        WeakReference<ContentBody> wr = _contentBodies.get(index);
+        ContentBody cb = wr.get();
+        if (cb == null)
+        {
+            cb = _messageStore.getContentBodyChunk(_messageId, index);
+            _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+        }
+        return cb;
     }
 
     public void addContentBodyFrame(ContentBody contentBody) throws AMQException
     {
-        _contentBodies.add(contentBody);
+        _contentBodies.add(new WeakReference<ContentBody>(contentBody));
         _messageStore.storeContentBodyChunk(_messageId, _contentBodies.size() - 1, contentBody);
     }
 
-    public void setPublishBody(BasicPublishBody publishBody)
-    {
-        _publishBody = publishBody;
-    }
-
     public BasicPublishBody getPublishBody() throws AMQException
     {
-        return _publishBody;
+        BasicPublishBody bpb = _publishBody.get();
+        if (bpb == null)
+        {
+            MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+            bpb = mmd.getPublishBody();
+            _publishBody = new WeakReference<BasicPublishBody>(bpb);
+            _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+        }
+        return bpb;
     }
 
     public boolean isRedelivered()
@@ -91,13 +108,24 @@
 
     public boolean isPersistent() throws AMQException
     {
-        if (_contentHeaderBody == null)
-        {
-            throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
-        }
-
         //todo remove literal values to a constant file such as AMQConstants in common
-        return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
-                && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+        ContentHeaderBody chb = getContentHeaderBody();
+        return chb.properties instanceof BasicContentHeaderProperties &&
+               ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+    }
+
+    /**
+     * This is called when all the content has been received.
+     * @param publishBody
+     * @param contentHeaderBody
+     * @throws AMQException
+     */
+    public void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+        _messageStore.storeMessageMetaData(_messageId, new MessageMetaData(publishBody, contentHeaderBody,
+                                                                           _contentBodies.size()));
+        _publishBody = new WeakReference<BasicPublishBody>(publishBody);
+        _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java Fri Nov 17 13:24:47 2006
@@ -20,13 +20,12 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -43,9 +42,7 @@
 
     private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
 
-    protected ConcurrentMap<Long, BasicPublishBody> _publishBodyMap;
-
-    protected ConcurrentMap<Long, ContentHeaderBody> _contentHeaderMap;
+    protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
 
     protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
 
@@ -53,18 +50,16 @@
 
     public void configure()
     {
-        _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
-        _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>(DEFAULT_HASHTABLE_CAPACITY);
-        _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>(DEFAULT_HASHTABLE_CAPACITY);
+        _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
         _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
     }
 
     public void configure(String base, Configuration config)
     {
         int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
-        _log.info("Using capacity " + hashtableCapacity + " for hash table");
-        _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>(hashtableCapacity);
-        _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>(hashtableCapacity);
+        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
         _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
     }
 
@@ -75,10 +70,15 @@
 
     public void close() throws Exception
     {
-        if (_publishBodyMap != null)
+        if (_metaDataMap != null)
+        {
+            _metaDataMap.clear();
+            _metaDataMap = null;
+        }
+        if (_contentBodyMap != null)
         {
-            _publishBodyMap.clear();
-            _publishBodyMap = null;
+            _contentBodyMap.clear();
+            _contentBodyMap = null;
         }
     }
 
@@ -88,7 +88,8 @@
         {
             _log.debug("Removing message with id " + messageId);
         }
-        _publishBodyMap.remove(messageId);
+        _metaDataMap.remove(messageId);
+        _contentBodyMap.remove(messageId);
     }
 
     public void createQueue(AMQQueue queue) throws AMQException
@@ -141,25 +142,31 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storePublishBody(long messageId, BasicPublishBody publishBody) throws AMQException
-    {
-        _publishBodyMap.put(messageId, publishBody);
-    }
-
-    public void storeContentHeader(long messageId, ContentHeaderBody contentHeaderBody) throws AMQException
-    {
-        _contentHeaderMap.put(messageId, contentHeaderBody);
-    }
-
     public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
     {
         List<ContentBody> bodyList = _contentBodyMap.get(messageId);
         if (bodyList == null)
         {
-            bodyList = new LinkedList<ContentBody>();
+            bodyList = new ArrayList<ContentBody>();
             _contentBodyMap.put(messageId, bodyList);
         }
 
         bodyList.add(index, contentBody);
+    }
+
+    public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+    {
+        _metaDataMap.put(messageId, messageMetaData);
+    }
+
+    public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+    {
+        return _metaDataMap.get(messageId);
+    }
+
+    public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+    {
+        List<ContentBody> bodyList = _contentBodyMap.get(messageId);        
+        return bodyList.get(index);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java Fri Nov 17 13:24:47 2006
@@ -19,10 +19,9 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
 
 import java.util.List;
@@ -77,11 +76,12 @@
      */
     long getNewMessageId();
 
-    void storePublishBody(long messageId, BasicPublishBody publishBody) throws AMQException;
+    void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException;
 
-    void storeContentHeader(long messageId, ContentHeaderBody contentHeaderBody) throws AMQException;
+    void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException;
 
-    void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException;            
-}
+    MessageMetaData getMessageMetaData(long messageId) throws AMQException;
 
+    ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
 
+}

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java Fri Nov 17 13:24:47 2006
@@ -111,7 +111,7 @@
             }
         });
 
-        assertTrue(_messageStore.gePublishBodyMap().size() == msgCount);
+        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
     }
 
     /**
@@ -127,7 +127,7 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getContentHeaderMap().size() == 0);
+        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
     }
 
     /**

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Nov 17 13:24:47 2006
@@ -24,6 +24,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageMetaData;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +35,7 @@
  */
 public class SkeletonMessageStore implements MessageStore
 {
-    private final AtomicLong _messageId = new AtomicLong(1);    
+    private final AtomicLong _messageId = new AtomicLong(1);
 
     public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
     {
@@ -101,5 +102,19 @@
 
     public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
     {
+    }
+
+    public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+    {        
+    }
+
+    public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+    {
+        return null;
+    }
+
+    public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+    {
+        return null;
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Nov 17 13:24:47 2006
@@ -17,9 +17,8 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.queue.MessageMetaData;
 
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,19 +31,13 @@
 {
     public TestableMemoryMessageStore()
     {
-        _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>();
-        _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>();
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
         _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
     }
 
-    public ConcurrentMap<Long, BasicPublishBody> gePublishBodyMap()
+    public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
     {
-        return _publishBodyMap;
-    }
-
-    public ConcurrentMap<Long, ContentHeaderBody> getContentHeaderMap()
-    {
-        return _contentHeaderMap;
+        return _metaDataMap;
     }
 
     public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java Fri Nov 17 13:24:47 2006
@@ -141,7 +141,7 @@
         _logger.info("No messages redelivered as is expected");
         con.close();
 
-        _logger.info("Actually:" + store.gePublishBodyMap().size());
+        _logger.info("Actually:" + store.getMessageMetaDataMap().size());
         //  Assert.assertTrue(store.getMessageMap().size() == 0);
     }
 
@@ -197,8 +197,8 @@
         Assert.assertNull(tm);
         _logger.info("No messages redelivered as is expected");
 
-        _logger.info("Actually:" + store.gePublishBodyMap().size());
-        Assert.assertTrue(store.gePublishBodyMap().size() == 0);
+        _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+        Assert.assertTrue(store.getMessageMetaDataMap().size() == 0);
         con.close();
     }