You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/20 17:20:46 UTC

svn commit: r509628 [2/2] - in /incubator/qpid/trunk/qpid: gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/configuration/ java/broker/src/main/java/org/apache/qpid/server/exchange/...

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 20 08:20:41 2007
@@ -1156,6 +1156,41 @@
         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
+
+    public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
+    {
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+                                                                getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
+                                                                null,    // arguments
+                                                                autoDelete,    // autoDelete
+                                                                durable,    // durable
+                                                                exclusive,    // exclusive
+                                                                false,    // nowait
+                                                                false,    // passive
+                                                                name,    // queue
+                                                                getTicket());    // ticket
+
+        getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+    }
+
+
+    public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
+    {
+        // TODO: Be aware of possible changes to parameter order as versions change.
+        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+                                                          getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
+                                                          arguments,    // arguments
+                                                          exchangeName,    // exchange
+                                                          false,    // nowait
+                                                          queueName,    // queue
+                                                          routingKey,    // routingKey
+                                                          getTicket());    // ticket
+
+
+        getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+    }
+
     /**
      * Declare the queue.
      *

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java Tue Feb 20 08:20:41 2007
@@ -22,13 +22,19 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.MethodConverter_8_0;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.server.queue.AMQMessage;
 
 import java.util.Iterator;
 
 public class SimpleSendable implements Sendable
 {
+
+    //todo fixme - remove 0-8 hard coding
+    ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0();
+
     private final AMQMessage _message;
 
     public SimpleSendable(AMQMessage message)
@@ -38,12 +44,12 @@
 
     public void send(int channel, Member member) throws AMQException
     {
-        member.send(new AMQFrame(channel, _message.getPublishBody()));
+        member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo())));
         member.send(new AMQFrame(channel, _message.getContentHeaderBody()));
-        Iterator<ContentBody> it = _message.getContentBodyIterator();
+        Iterator<ContentChunk> it = _message.getContentBodyIterator();
         while (it.hasNext())
         {
-            member.send(new AMQFrame(channel, it.next()));
+            member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next())));
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java Tue Feb 20 08:20:41 2007
@@ -31,8 +31,6 @@
 import org.apache.qpid.server.cluster.util.LogMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.concurrent.Executor;
-
 /**
  * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
  * not require all the functionality currently in AMQQueue.
@@ -81,8 +79,11 @@
 
     void relay(AMQMessage msg) throws AMQException
     {
-        BasicPublishBody publish = msg.getPublishBody();
-        publish.immediate = false; //can't as yet handle the immediate flag in a cluster
+        // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object
+        // if cluster can handle immediate then it should wrap the wrapper...
+        
+//        BasicPublishBody publish = msg.getMessagePublishInfo();
+//        publish.immediate = false; //can't as yet handle the immediate flag in a cluster
 
         // send this on to the broker for which it is acting as proxy:
         _groupMgr.send(_target, new SimpleSendable(msg));

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Feb 20 08:20:41 2007
@@ -96,6 +96,8 @@
         }
     }
 
+
+
     public static AMQFrame createAMQFrame(int channelId, ContentBody body)
     {
         final AMQFrame frame = new AMQFrame(channelId, body);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Feb 20 08:20:41 2007
@@ -641,8 +641,7 @@
 
     public static void writeTimestamp(ByteBuffer buffer, long timestamp)
     {
-        writeUnsignedInteger(buffer, 0/*timestamp msb*/);
-        writeUnsignedInteger(buffer, timestamp);
+        writeLong(buffer, timestamp);
     }
 
     public static boolean[] readBooleans(ByteBuffer buffer)
@@ -765,8 +764,8 @@
     public static long readTimestamp(ByteBuffer buffer)
     {
         // Discard msb from AMQ timestamp
-        buffer.getUnsignedInt();
-        return buffer.getUnsignedInt();
+        //buffer.getUnsignedInt();
+        return buffer.getLong();
     }
 
 

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,104 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+    private int _basicPublishClassId;
+    private int _basicPublishMethodId;
+
+    public MethodConverter_8_0()
+    {
+        super((byte)8,(byte)0);
+
+
+    }
+
+    public AMQBody convertToBody(ContentChunk contentChunk)
+    {
+        return new ContentBody(contentChunk.getData());
+    }
+
+    public ContentChunk convertToContentChunk(AMQBody body)
+    {
+        final ContentBody contentBodyChunk = (ContentBody) body;
+
+        return new ContentChunk()
+        {
+
+            public int getSize()
+            {
+                return contentBodyChunk.getSize();
+            }
+
+            public ByteBuffer getData()
+            {
+                return contentBodyChunk.payload;
+            }
+
+            public void reduceToFit()
+            {
+                contentBodyChunk.reduceBufferToFit();
+            }
+        };
+
+    }
+
+    public void configure()
+    {
+
+        _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
+        _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+                
+    }
+
+    public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+    {
+        final BasicPublishBody body = (BasicPublishBody) methodBody;
+        
+        return new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return body.getExchange();
+            }
+
+            public boolean isImmediate()
+            {
+                return body.getImmediate();
+            }
+
+            public boolean isMandatory()
+            {
+                return body.getMandatory();
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return body.getRoutingKey();
+            }
+        };
+
+    }
+
+    public AMQMethodBody convertToBody(MessagePublishInfo info)
+    {
+
+        return new BasicPublishBody(getProtocolMajorVersion(),
+                                    getProtocolMinorVersion(),
+                                    _basicPublishClassId,
+                                    _basicPublishMethodId,
+                                    info.getExchange(),
+                                    info.isImmediate(),
+                                    info.isMandatory(),
+                                    info.getRoutingKey(),
+                                    0) ; // ticket
+
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Tue Feb 20 08:20:41 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 
@@ -36,10 +38,53 @@
 
     private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
 
+    private ProtocolVersionMethodConverter _protocolVersionConverter;
+
     public VersionSpecificRegistry(byte major, byte minor)
     {
         _protocolMajorVersion = major;
         _protocolMinorVersion = minor;
+
+        _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+    }
+
+    private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion)
+    {
+        try
+        {
+            Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+                    (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion);
+            return versionMethodConverterClass.newInstance();
+
+        }
+        catch (ClassNotFoundException e)
+        {
+            _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+            if(protocolMinorVersion != 0)
+            {
+                protocolMinorVersion--;
+                return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+            }
+            else if (protocolMajorVersion != 0)
+            {
+                protocolMajorVersion--;
+                return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+            }
+            else
+            {
+                return null;
+            }
+
+
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new IllegalStateException("Unable to load protocol version converter: ", e);
+        }
+        catch (InstantiationException e)
+        {
+             throw new IllegalStateException("Unable to load protocol version converter: ", e);
+        }
     }
 
     public byte getProtocolMajorVersion()
@@ -137,5 +182,15 @@
         return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
 
 
+    }
+
+    public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+    {
+        return _protocolVersionConverter;
+    }
+
+    public void configure()
+    {
+        _protocolVersionConverter.configure();
     }
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,26 @@
+package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+    private final byte _protocolMajorVersion;
+
+
+    private final byte _protocolMinorVersion;
+
+    public AbstractMethodConverter(byte major, byte minor)
+    {
+        _protocolMajorVersion = major;
+        _protocolMinorVersion = minor;
+    }
+
+
+    public final byte getProtocolMajorVersion()
+    {
+        return _protocolMajorVersion;
+    }
+
+    public final byte getProtocolMinorVersion()
+    {
+        return _protocolMinorVersion;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+    int getSize();
+    ByteBuffer getData();
+
+    void reduceToFit();
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+    public AMQShortString getExchange();
+
+    public boolean isImmediate();
+
+    public boolean isMandatory();
+
+    public AMQShortString getRoutingKey();
+
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+    public MessagePublishInfo convertToInfo(AMQMethodBody body);
+    public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+    AMQBody convertToBody(ContentChunk contentBody);
+    ContentChunk convertToContentChunk(AMQBody body);
+
+    void configure();
+}

Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Feb 20 08:20:41 2007
@@ -154,7 +154,7 @@
 
     public void testSetGetTimestamp()
     {
-        long timestamp = 999999999;
+        long timestamp = System.currentTimeMillis();
         _testProperties.setTimestamp(timestamp);
         assertEquals(timestamp, _testProperties.getTimestamp());
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Feb 20 08:20:41 2007
@@ -23,6 +23,8 @@
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -103,16 +105,32 @@
             for(int i = 0; i < messageCount; i++)
             {
                 long deliveryTag = i + 1;
-                // TODO: fix hardcoded protocol version data
-                TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
-                                                                                           (byte)0,
-                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                                                           null,
-                                                                                           false,
-                                                                                           false,
-                                                                                           null,
-                                                                                           0), txnContext);
+
+                MessagePublishInfo info = new MessagePublishInfo()
+                {
+
+                    public AMQShortString getExchange()
+                    {
+                        return null;
+                    }
+
+                    public boolean isImmediate()
+                    {
+                        return false;
+                    }
+
+                    public boolean isMandatory()
+                    {
+                        return false;
+                    }
+
+                    public AMQShortString getRoutingKey()
+                    {
+                        return null;
+                    }
+                };
+
+                TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
                 _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
             }
             _acked = acked;
@@ -174,7 +192,7 @@
         private final long _tag;
         private int _count;
 
-        TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
         {
             super(messageId, publishBody, txnContext);
             _tag = tag;

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Feb 20 08:20:41 2007
@@ -23,6 +23,7 @@
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -149,15 +150,97 @@
         return headers;
     }
 
-    static BasicPublishBody getPublishRequest(String id)
+
+    static final class MessagePublishInfoImpl implements MessagePublishInfo
+    {
+        private AMQShortString _exchange;
+        private boolean _immediate;
+        private boolean _mandatory;
+        private AMQShortString _routingKey;
+
+
+        public MessagePublishInfoImpl(AMQShortString routingKey)
+        {
+            _routingKey = routingKey;
+        }
+
+        public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+        {
+            _exchange = exchange;
+            _immediate = immediate;
+            _mandatory = mandatory;
+            _routingKey = routingKey;
+        }
+
+        public AMQShortString getExchange()
+        {
+            return _exchange;
+        }
+
+        public boolean isImmediate()
+        {
+            return _immediate;
+
+        }
+
+        public boolean isMandatory()
+        {
+            return _mandatory;
+        }
+
+        public AMQShortString getRoutingKey()
+        {
+            return _routingKey;
+        }
+
+
+        public void setExchange(AMQShortString exchange)
+        {
+            _exchange = exchange;
+        }
+
+        public void setImmediate(boolean immediate)
+        {
+            _immediate = immediate;
+        }
+
+        public void setMandatory(boolean mandatory)
+        {
+            _mandatory = mandatory;
+        }
+
+        public void setRoutingKey(AMQShortString routingKey)
+        {
+            _routingKey = routingKey;
+        }
+    }
+
+    static MessagePublishInfo getPublishRequest(final String id)
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Establish some way to determine the version for the test.
-        BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
-                                                       BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                       BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                       null,false,false,new AMQShortString(id),0);
-        
+        MessagePublishInfo request = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return new AMQShortString(id);
+            }
+        };
+                                                      
         return request;
     }
 
@@ -221,7 +304,7 @@
             this(getPublishRequest(id), getContentHeader(headers), null);
         }
 
-        private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+        private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
         {
             super(_messageStore.getNewMessageId(), publish, _txnContext, header);
         }
@@ -265,7 +348,7 @@
         {
             try
             {
-                return getPublishBody().routingKey;
+                return getMessagePublishInfo().getRoutingKey();
             }
             catch (AMQException e)
             {

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Feb 20 08:20:41 2007
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.qpid.framing.BasicPublishBody;
 
@@ -55,13 +54,13 @@
 
         Message m7 = new Message("Message7", "XXXXX");
 
-        BasicPublishBody pb7 = m7.getPublishBody();
-        pb7.mandatory = true;
+        MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
+        pb7.setMandatory(true);
         routeAndTest(m7,true);
 
         Message m8 = new Message("Message8", "F0000");
-        BasicPublishBody pb8 = m8.getPublishBody();
-        pb8.mandatory = true;
+        MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
+        pb8.setMandatory(true);
         routeAndTest(m8,false,q1);
 
 
@@ -88,10 +87,10 @@
         bindDefault("F0000");
         Message m1 = new Message("Message1", "XXXXX");
         Message m2 = new Message("Message2", "F0000");
-        BasicPublishBody pb1 = m1.getPublishBody();
-        pb1.mandatory = true;
-        BasicPublishBody pb2 = m2.getPublishBody();
-        pb2.mandatory = true;
+        MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
+        pb1.setMandatory(true);
+        MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
+        pb2.setMandatory(true);
         routeAndTest(m1,true);
     }
 

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -164,20 +165,32 @@
         }
     }
 
-    private AMQMessage message(boolean immediate) throws AMQException
+    private AMQMessage message(final boolean immediate) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Establish some way to determine the version for the test.
-        BasicPublishBody publish = new BasicPublishBody((byte)8,
-                                                        (byte)0,
-                                                        BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                        BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                        null,
-                                                        immediate,
-                                                        false,
-                                                        null,
-                                                        0);
-        
+        MessagePublishInfo publish = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+                              
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = 1000;   // in bytes
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Feb 20 08:20:41 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
@@ -98,15 +99,29 @@
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Establish some way to determine the version for the test.
-            BasicPublishBody publishBody = new BasicPublishBody((byte)8,
-                                                                (byte)0,
-                                                                BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                                BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                                new AMQShortString("someExchange"),
-                                                                false,
-                                                                false,
-                                                                new AMQShortString("rk"),
-                                                                0);
+            MessagePublishInfo publishBody = new MessagePublishInfo()
+            {
+
+                public AMQShortString getExchange()
+                {
+                    return new AMQShortString("someExchange");
+                }
+
+                public boolean isImmediate()
+                {
+                    return false;
+                }
+
+                public boolean isMandatory()
+                {
+                    return false;
+                }
+
+                public AMQShortString getRoutingKey()
+                {
+                    return new AMQShortString("rk");
+                }
+            };
             AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
             if (persistent)
             {

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,8 @@
 
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.StoreContext;
@@ -57,20 +59,32 @@
         return message(false);
     }
 
-    AMQMessage message(boolean immediate) throws AMQException
+    AMQMessage message(final boolean immediate) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Establish some way to determine the version for the test.
-        BasicPublishBody publish = new BasicPublishBody((byte)8,
-                                                        (byte)0,
-                                                        BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                        BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                        null,
-                                                        immediate,
-                                                        false,
-                                                        null,
-                                                        0);
-        
+        MessagePublishInfo publish = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+                              
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
                               new ContentHeaderBody());
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Tue Feb 20 08:20:41 2007
@@ -24,9 +24,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +59,26 @@
     {
     }
 
+    public void createExchange(Exchange exchange) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeExchange(Exchange exchange) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void createQueue(AMQQueue queue) throws AMQException
     {
     }        
@@ -87,7 +110,7 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+    public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
     {
 
     }
@@ -102,7 +125,7 @@
         return null;
     }
 
-    public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+    public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
     {
         return null;
     }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Tue Feb 20 08:20:41 2007
@@ -25,6 +25,8 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -50,16 +52,32 @@
     public void testMessageGetsRemoved() throws AMQException
     {
         createPersistentContentHeader();
-        // TODO: fix hardcoded protocol version data
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
-                                                                                           (byte)0,
-                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                                                           null,
-                                                                                           false,
-                                                                                           false,
-                                                                                           null,
-                                                                                           0),
+
+        MessagePublishInfo info = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+
+        AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
                                             new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
         message.incrementReference();
@@ -81,16 +99,33 @@
 
     public void testMessageRemains() throws AMQException
     {
-        // TODO: fix hardcoded protocol version data
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
-                                                                                           (byte)0,
-                                                                                           BasicPublishBody.getClazz((byte)8,(byte)0),
-                                                                                           BasicPublishBody.getMethod((byte)8,(byte)0),
-                                                                                           null,
-                                                                                           false,
-                                                                                           false,
-                                                                                           null,
-                                                                                           0),
+
+        MessagePublishInfo info = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+
+        AMQMessage message = new AMQMessage(_store.getNewMessageId(),
+                                            info,
                                             new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
         message.incrementReference();

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -35,7 +36,7 @@
     public TestableMemoryMessageStore()
     {
         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
     }
 
     public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
@@ -43,7 +44,7 @@
         return _metaDataMap;
     }
 
-    public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
+    public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
     {
         return _contentBodyMap;
     }