You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/11/17 22:24:48 UTC
svn commit: r476325 - in /incubator/qpid/branches/new_persistence/java:
broker/src/org/apache/qpid/server/ broker/src/org/apache/qpid/server/handler/
broker/src/org/apache/qpid/server/queue/
broker/src/org/apache/qpid/server/store/ broker/test/src/org/...
Author: rgreig
Date: Fri Nov 17 13:24:47 2006
New Revision: 476325
URL: http://svn.apache.org/viewvc?view=rev&rev=476325
Log:
QPID-32 Work-in-progress commit.
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java (with props)
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java Fri Nov 17 13:24:47 2006
@@ -553,7 +553,7 @@
return _defaultQueue;
}
- public void processReturns(AMQProtocolSession session)
+ public void processReturns(AMQProtocolSession session) throws AMQException
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/TxCommitHandler.java Fri Nov 17 13:24:47 2006
@@ -46,12 +46,15 @@
AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
- try{
+ try
+ {
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
- channel.processReturns(protocolSession);
- }catch(AMQException e){
+ channel.processReturns(protocolSession);
+ }
+ catch(AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java Fri Nov 17 13:24:47 2006
@@ -95,7 +95,17 @@
public AMQDataBlock next()
{
- return ContentBody.createAMQFrame(_channel, _messageHandle.getContentBody(++_index));
+ try
+ {
+ ContentBody cb = _messageHandle.getContentBody(++_index);
+ return ContentBody.createAMQFrame(_channel, cb);
+ }
+ catch (AMQException e)
+ {
+ // have no choice but to throw a runtime exception
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
+
}
public void remove()
@@ -116,7 +126,14 @@
public ContentBody next()
{
- return _messageHandle.getContentBody(++_index);
+ try
+ {
+ return _messageHandle.getContentBody(++_index);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error getting content body: " + e, e);
+ }
}
public void remove()
@@ -199,7 +216,7 @@
return _publishBody;
} */
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
return _messageHandle.getContentHeaderBody();
}
@@ -219,14 +236,11 @@
{
_txnContext.beginTranIfNecessary();
}
- _messageHandle.setPublishBody(_publishBody);
- _messageHandle.setContentHeaderBody(_contentHeaderBody);
+
if (_contentHeaderBody.bodySize == 0)
{
deliver();
}
- _publishBody = null;
- _contentHeaderBody = null;
}
public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
@@ -244,7 +258,7 @@
}
}
- public boolean isAllContentReceived()
+ public boolean isAllContentReceived() throws AMQException
{
return _bodyLengthReceived == _messageHandle.getBodySize();
}
@@ -401,7 +415,13 @@
private void deliver() throws AMQException
{
- // we allow the transactional context to do something with the message content
+ // first we allow the handle to know that the message has been fully received. This is useful if it is
+ // maintaining any calculated values based on content chunks
+ _messageHandle.setPublishAndContentHeaderBody(_publishBody, _contentHeaderBody);
+ _publishBody = null;
+ _contentHeaderBody = null;
+
+ // we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
for (AMQQueue q : _destinationQueues)
@@ -471,6 +491,7 @@
}
public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText)
+ throws AMQException
{
ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText);
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java Fri Nov 17 13:24:47 2006
@@ -18,9 +18,7 @@
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody();
-
- void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException;
+ ContentHeaderBody getContentHeaderBody() throws AMQException;
/**
* @return the number of body frames associated with this message
@@ -30,7 +28,7 @@
/**
* @return the size of the body
*/
- long getBodySize();
+ long getBodySize() throws AMQException;
/**
* Get a particular content body
@@ -38,7 +36,7 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(int index) throws IllegalArgumentException;
+ ContentBody getContentBody(int index) throws IllegalArgumentException, AMQException;
void addContentBodyFrame(ContentBody contentBody) throws AMQException;
@@ -48,6 +46,7 @@
boolean isPersistent() throws AMQException;
- void setPublishBody(BasicPublishBody publishBody)
- ;
+ void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody)
+ throws AMQException;
+
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java Fri Nov 17 13:24:47 2006
@@ -264,7 +264,7 @@
}
// Returns the size of messages in the queue
- public Long getQueueSize()
+ public Long getQueueSize() throws AMQException
{
List<AMQMessage> list = _deliveryMgr.getMessages();
if (list.size() == 0)
@@ -280,7 +280,7 @@
// Operations
// calculates the size of an AMQMessage
- private long getMessageSize(AMQMessage msg)
+ private long getMessageSize(AMQMessage msg) throws AMQException
{
if (msg == null)
{
@@ -291,7 +291,7 @@
}
// Checks if there is any notification to be send to the listeners
- private void checkForNotification(AMQMessage msg)
+ private void checkForNotification(AMQMessage msg) throws AMQException
{
// Check for message count
Integer msgCount = getMessageCount();
@@ -722,7 +722,7 @@
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg)
+ protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
{
_totalMessagesReceived++;
_managedObject.checkForNotification(msg);
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Fri Nov 17 13:24:47 2006
@@ -20,6 +20,7 @@
import org.apache.qpid.server.management.MBeanAttribute;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.AMQException;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
@@ -147,7 +148,7 @@
* @throws IOException
*/
@MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
- Long getQueueSize() throws IOException;
+ Long getQueueSize() throws IOException, AMQException;
/**
* Tells the maximum size of all the messages combined together,
Added: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java?view=auto&rev=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java (added)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java Fri Nov 17 13:24:47 2006
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
+ * single unit.
+ */
+public class MessageMetaData
+{
+ private BasicPublishBody _publishBody;
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private int _contentChunkCount;
+
+ public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ _publishBody = publishBody;
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public int getContentChunkCount()
+ {
+ return _contentChunkCount;
+ }
+
+ public void setContentChunkCount(int contentChunkCount)
+ {
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageMetaData.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Fri Nov 17 13:24:47 2006
@@ -17,17 +17,18 @@
import java.util.List;
import java.util.LinkedList;
+import java.lang.ref.WeakReference;
/**
* @author Robert Greig (robert.j.greig@jpmorgan.com)
*/
public class WeakReferenceMessageHandle implements AMQMessageHandle
{
- private ContentHeaderBody _contentHeaderBody;
+ private WeakReference<ContentHeaderBody> _contentHeaderBody;
- private BasicPublishBody _publishBody;
+ private WeakReference<BasicPublishBody> _publishBody;
- private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+ private List<WeakReference<ContentBody>> _contentBodies = new LinkedList<WeakReference<ContentBody>>();
private boolean _redelivered;
@@ -41,16 +42,17 @@
_messageId = messageId;
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _contentHeaderBody;
- }
-
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
- {
- _contentHeaderBody = contentHeaderBody;
- _messageStore.storePublishBody(_messageId, _publishBody);
- _messageStore.storeContentHeader(_messageId, contentHeaderBody);
+ ContentHeaderBody chb = _contentHeaderBody.get();
+ if (chb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+ chb = mmd.getContentHeaderBody();
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
+ _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ }
+ return chb;
}
public int getBodyCount()
@@ -58,30 +60,45 @@
return _contentBodies.size();
}
- public long getBodySize()
+ public long getBodySize() throws AMQException
{
- return _contentHeaderBody.bodySize;
+ return getContentHeaderBody().bodySize;
}
- public ContentBody getContentBody(int index) throws IllegalArgumentException
+ public ContentBody getContentBody(int index) throws AMQException, IllegalArgumentException
{
- return _contentBodies.get(index);
+ if (index > _contentBodies.size() - 1)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ WeakReference<ContentBody> wr = _contentBodies.get(index);
+ ContentBody cb = wr.get();
+ if (cb == null)
+ {
+ cb = _messageStore.getContentBodyChunk(_messageId, index);
+ _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+ }
+ return cb;
}
public void addContentBodyFrame(ContentBody contentBody) throws AMQException
{
- _contentBodies.add(contentBody);
+ _contentBodies.add(new WeakReference<ContentBody>(contentBody));
_messageStore.storeContentBodyChunk(_messageId, _contentBodies.size() - 1, contentBody);
}
- public void setPublishBody(BasicPublishBody publishBody)
- {
- _publishBody = publishBody;
- }
-
public BasicPublishBody getPublishBody() throws AMQException
{
- return _publishBody;
+ BasicPublishBody bpb = _publishBody.get();
+ if (bpb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+ bpb = mmd.getPublishBody();
+ _publishBody = new WeakReference<BasicPublishBody>(bpb);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+ }
+ return bpb;
}
public boolean isRedelivered()
@@ -91,13 +108,24 @@
public boolean isPersistent() throws AMQException
{
- if (_contentHeaderBody == null)
- {
- throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
- }
-
//todo remove literal values to a constant file such as AMQConstants in common
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ ContentHeaderBody chb = getContentHeaderBody();
+ return chb.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param publishBody
+ * @param contentHeaderBody
+ * @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ _messageStore.storeMessageMetaData(_messageId, new MessageMetaData(publishBody, contentHeaderBody,
+ _contentBodies.size()));
+ _publishBody = new WeakReference<BasicPublishBody>(publishBody);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java Fri Nov 17 13:24:47 2006
@@ -20,13 +20,12 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -43,9 +42,7 @@
private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
- protected ConcurrentMap<Long, BasicPublishBody> _publishBodyMap;
-
- protected ConcurrentMap<Long, ContentHeaderBody> _contentHeaderMap;
+ protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
@@ -53,18 +50,16 @@
public void configure()
{
- _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>(DEFAULT_HASHTABLE_CAPACITY);
- _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>(DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
{
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash table");
- _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>(hashtableCapacity);
- _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>(hashtableCapacity);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
@@ -75,10 +70,15 @@
public void close() throws Exception
{
- if (_publishBodyMap != null)
+ if (_metaDataMap != null)
+ {
+ _metaDataMap.clear();
+ _metaDataMap = null;
+ }
+ if (_contentBodyMap != null)
{
- _publishBodyMap.clear();
- _publishBodyMap = null;
+ _contentBodyMap.clear();
+ _contentBodyMap = null;
}
}
@@ -88,7 +88,8 @@
{
_log.debug("Removing message with id " + messageId);
}
- _publishBodyMap.remove(messageId);
+ _metaDataMap.remove(messageId);
+ _contentBodyMap.remove(messageId);
}
public void createQueue(AMQQueue queue) throws AMQException
@@ -141,25 +142,31 @@
return _messageId.getAndIncrement();
}
- public void storePublishBody(long messageId, BasicPublishBody publishBody) throws AMQException
- {
- _publishBodyMap.put(messageId, publishBody);
- }
-
- public void storeContentHeader(long messageId, ContentHeaderBody contentHeaderBody) throws AMQException
- {
- _contentHeaderMap.put(messageId, contentHeaderBody);
- }
-
public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
if (bodyList == null)
{
- bodyList = new LinkedList<ContentBody>();
+ bodyList = new ArrayList<ContentBody>();
_contentBodyMap.put(messageId, bodyList);
}
bodyList.add(index, contentBody);
+ }
+
+ public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _metaDataMap.put(messageId, messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ {
+ return _metaDataMap.get(messageId);
+ }
+
+ public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+ {
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ return bodyList.get(index);
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java Fri Nov 17 13:24:47 2006
@@ -19,10 +19,9 @@
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import java.util.List;
@@ -77,11 +76,12 @@
*/
long getNewMessageId();
- void storePublishBody(long messageId, BasicPublishBody publishBody) throws AMQException;
+ void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException;
- void storeContentHeader(long messageId, ContentHeaderBody contentHeaderBody) throws AMQException;
+ void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException;
- void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException;
-}
+ MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+ ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+}
Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java Fri Nov 17 13:24:47 2006
@@ -111,7 +111,7 @@
}
});
- assertTrue(_messageStore.gePublishBodyMap().size() == msgCount);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
@@ -127,7 +127,7 @@
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getContentHeaderMap().size() == 0);
+ assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
}
/**
Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Nov 17 13:24:47 2006
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageMetaData;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +35,7 @@
*/
public class SkeletonMessageStore implements MessageStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
+ private final AtomicLong _messageId = new AtomicLong(1);
public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
{
@@ -101,5 +102,19 @@
public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
{
+ }
+
+ public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ }
+
+ public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ {
+ return null;
+ }
+
+ public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+ {
+ return null;
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Nov 17 13:24:47 2006
@@ -17,9 +17,8 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.queue.MessageMetaData;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,19 +31,13 @@
{
public TestableMemoryMessageStore()
{
- _publishBodyMap = new ConcurrentHashMap<Long, BasicPublishBody>();
- _contentHeaderMap = new ConcurrentHashMap<Long, ContentHeaderBody>();
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
}
- public ConcurrentMap<Long, BasicPublishBody> gePublishBodyMap()
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- return _publishBodyMap;
- }
-
- public ConcurrentMap<Long, ContentHeaderBody> getContentHeaderMap()
- {
- return _contentHeaderMap;
+ return _metaDataMap;
}
public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=diff&rev=476325&r1=476324&r2=476325
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java Fri Nov 17 13:24:47 2006
@@ -141,7 +141,7 @@
_logger.info("No messages redelivered as is expected");
con.close();
- _logger.info("Actually:" + store.gePublishBodyMap().size());
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
// Assert.assertTrue(store.getMessageMap().size() == 0);
}
@@ -197,8 +197,8 @@
Assert.assertNull(tm);
_logger.info("No messages redelivered as is expected");
- _logger.info("Actually:" + store.gePublishBodyMap().size());
- Assert.assertTrue(store.gePublishBodyMap().size() == 0);
+ _logger.info("Actually:" + store.getMessageMetaDataMap().size());
+ Assert.assertTrue(store.getMessageMetaDataMap().size() == 0);
con.close();
}