You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/01/22 18:16:45 UTC

svn commit: r1560424 [2/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/mes...

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Wed Jan 22 17:16:44 2014
@@ -21,35 +21,23 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
-import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.StoredMessage;
 
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
 {
 
-
-    private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
-            AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
-    private volatile int _referenceCount = 0;
-
-    private final StoredMessage<MessageMetaData_1_0> _storedMessage;
     private List<ByteBuffer> _fragments;
-    private WeakReference<Session_1_0> _session;
     private long _arrivalTime;
 
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        _storedMessage = storedMessage;
-        _session = null;
+        super(storedMessage, null);
         _fragments = restoreFragments(storedMessage);
     }
 
@@ -75,11 +63,10 @@ public class Message_1_0 implements Serv
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
                        final List<ByteBuffer> fragments,
-                       final Session_1_0 session)
+                       final Object connectionReference)
     {
-        _storedMessage = storedMessage;
+        super(storedMessage, connectionReference);
         _fragments = fragments;
-        _session = new WeakReference<Session_1_0>(session);
         _arrivalTime = System.currentTimeMillis();
     }
 
@@ -98,7 +85,7 @@ public class Message_1_0 implements Serv
 
     private MessageMetaData_1_0 getMessageMetaData()
     {
-        return _storedMessage.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
@@ -106,16 +93,6 @@ public class Message_1_0 implements Serv
         return getMessageMetaData().getMessageHeader();
     }
 
-    public StoredMessage getStoredMessage()
-    {
-        return _storedMessage;
-    }
-
-    public boolean isPersistent()
-    {
-        return getMessageMetaData().isPersistent();
-    }
-
     public boolean isRedelivered()
     {
         // TODO
@@ -136,121 +113,19 @@ public class Message_1_0 implements Serv
         return size;
     }
 
-    public boolean isImmediate()
-    {
-        return false;
-    }
-
     public long getExpiration()
     {
         return getMessageHeader().getExpiration();
     }
 
-    public MessageReference<Message_1_0> newReference()
-    {
-        return new Reference(this);
-    }
-
-    public long getMessageNumber()
-    {
-        return _storedMessage.getMessageNumber();
-    }
-
     public long getArrivalTime()
     {
         return _arrivalTime;
     }
 
-    public int getContent(final ByteBuffer buf, final int offset)
-    {
-        return _storedMessage.getContent(offset, buf);
-    }
-
-    public ByteBuffer getContent(int offset, int size)
-    {
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        buf.limit(getContent(buf, offset));
-
-        return buf;
-    }
-
     public List<ByteBuffer> getFragments()
     {
         return _fragments;
     }
 
-    public Session_1_0 getSession()
-    {
-        return _session == null ? null : _session.get();
-    }
-
-
-    public boolean incrementReference()
-    {
-        if(_refCountUpdater.incrementAndGet(this) <= 0)
-        {
-            _refCountUpdater.decrementAndGet(this);
-            return false;
-        }
-        else
-        {
-            return true;
-        }
-    }
-
-    /**
-     * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
-     * message store.
-     */
-
-    public void decrementReference()
-    {
-        int count = _refCountUpdater.decrementAndGet(this);
-
-        // note that the operation of decrementing the reference count and then removing the message does not
-        // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
-        // the message has been passed to all queues. i.e. we are
-        // not relying on the all the increments having taken place before the delivery manager decrements.
-        if (count == 0)
-        {
-            // set the reference count way below 0 so that we can detect that the message has been deleted
-            // this is to guard against the message being spontaneously recreated (from the mgmt console)
-            // by copying from other queues at the same time as it is being removed.
-            _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
-            // must check if the handle is null since there may be cases where we decide to throw away a message
-            // and the handle has not yet been constructed
-            if (_storedMessage != null)
-            {
-                _storedMessage.remove();
-            }
-        }
-        else
-        {
-            if (count < 0)
-            {
-                throw new RuntimeException("Reference count for message id " + getMessageNumber()
-                                                  + " has gone below 0.");
-            }
-        }
-    }
-
-    public static class Reference extends MessageReference<Message_1_0>
-    {
-        public Reference(Message_1_0 message)
-        {
-            super(message);
-        }
-
-        protected void onReference(Message_1_0 message)
-        {
-            message.incrementReference();
-        }
-
-        protected void onRelease(Message_1_0 message)
-        {
-            message.decrementReference();
-        }
-
-    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Wed Jan 22 17:16:44 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -160,8 +161,8 @@ public class ReceivingLink_1_0 implement
 
             storedMessage.flushToStore();
 
-            Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
+            Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+            MessageReference<Message_1_0> reference = message.newReference();
 
             Binary transactionId = null;
             org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
@@ -231,6 +232,8 @@ public class ReceivingLink_1_0 implement
                     }
                 });
             }
+
+            reference.release();
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Jan 22 17:16:44 2014
@@ -541,8 +541,7 @@ public class Session_1_0 implements Sess
     @Override
     public boolean onSameConnection(InboundMessage inbound)
     {
-        // TODO
-        return false;
+        return inbound.getConnectionReference() == getConnection().getReference();
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Wed Jan 22 17:16:44 2014
@@ -149,7 +149,7 @@ class Subscription_1_0 implements Subscr
     {
         if(entry.getMessage() instanceof Message_1_0)
         {
-            if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+            if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
             {
                 return false;
             }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Jan 22 17:16:44 2014
@@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_
     {
         return new MessageMetaData(convertPublishBody(message),
                 convertContentHeaderBody(message, vhost),
-                1,
                 message.getArrivalTime());
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Jan 22 17:16:44 2014
@@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_1
         body.flip();
 
         BasicContentHeaderProperties properties =
-                (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+                  message_0_8.getContentHeaderBody().getProperties();
 
         final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
         if(exchange != null)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java Wed Jan 22 17:16:44 2014
@@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0
         header.setDurable(serverMessage.isPersistent());
 
         BasicContentHeaderProperties contentHeader =
-                (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+                  serverMessage.getContentHeaderBody().getProperties();
 
         header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
         final long expiration = serverMessage.getExpiration();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Jan 22 17:16:44 2014
@@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSX
 import org.apache.qpid.client.JMSAMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
@@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 exte
     private static final boolean STRICT_AMQP_COMPLIANCE =
             Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
-    private ContentHeaderProperties _contentHeaderProperties;
+    private BasicContentHeaderProperties _contentHeaderProperties;
 
     // The base set of items that needs to be set. 
     private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
@@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 exte
         super(deliveryTag);
         _contentHeaderProperties = properties;
         _readableProperties = (_contentHeaderProperties != null);
-        _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+        _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders()
                                                                   : (new BasicContentHeaderProperties()).getHeaders() );
     }
 
@@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 exte
     {
         this(new BasicContentHeaderProperties(), -1);
         _readableProperties = false;
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+        _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders());
 
     }
 
@@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 exte
 
     public BasicContentHeaderProperties getContentHeaderProperties()
     {
-        return (BasicContentHeaderProperties) _contentHeaderProperties;
+        return _contentHeaderProperties;
     }
 
 
@@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 exte
         //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
         if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
         {
-            return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+            return _contentHeaderProperties.getUserIdAsString();
         }
         else
         {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Jan 22 17:16:44 2014
@@ -101,7 +101,7 @@ public abstract class AbstractJMSMessage
         }
 
         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-                                                                 (BasicContentHeaderProperties) contentHeader.getProperties(),
+                                                                 contentHeader.getProperties(),
                                                                  exchange, routingKey, queueDestinationCache, topicDestinationCache);
 
         return createMessage(delegate, data);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Jan 22 17:16:44 2014
@@ -110,7 +110,7 @@ public class MessageFactoryRegistry
                                             AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
             throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
+        BasicContentHeaderProperties properties = contentHeader.getProperties();
 
         // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the message is a byte message.

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Wed Jan 22 17:16:44 2014
@@ -27,7 +27,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+public class BasicContentHeaderProperties
 {
     //persistent & non-persistent constants, values as per JMS DeliveryMode
     public static final int NON_PERSISTENT = 1;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Jan 22 17:16:44 2014
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
     private long bodySize;
 
     /** must never be null */
-    private ContentHeaderProperties properties;
+    private BasicContentHeaderProperties properties;
 
     public ContentHeaderBody()
     {
@@ -57,13 +57,13 @@ public class ContentHeaderBody implement
     }
 
 
-    public ContentHeaderBody(ContentHeaderProperties props, int classId)
+    public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
     {
         properties = props;
         this.classId = classId;
     }
 
-    public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+    public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
     {
         this(props, classId);
         this.weight = weight;
@@ -121,12 +121,12 @@ public class ContentHeaderBody implement
         return new AMQFrame(channelId, body);
     }
 
-    public ContentHeaderProperties getProperties()
+    public BasicContentHeaderProperties getProperties()
     {
         return properties;
     }
 
-    public void setProperties(ContentHeaderProperties props)
+    public void setProperties(BasicContentHeaderProperties props)
     {
         properties = props;
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Wed Jan 22 17:16:44 2014
@@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFact
     {
     }
 
-    public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+    public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
                                                                  DataInput buffer, int size)
              throws AMQFrameDecodingException, IOException
     {
-        ContentHeaderProperties properties;
+        BasicContentHeaderProperties properties;
         // AMQP version change: "Hardwired" version to major=8, minor=0
         // TODO: Change so that the actual version is obtained from
         // the ProtocolInitiation object for this session.

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Wed Jan 22 17:16:44 2014
@@ -21,14 +21,10 @@
 
 package org.apache.qpid.framing.abstraction;
 
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQMethodBody;
 
-public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+public interface ProtocolVersionMethodConverter
 {
-    AMQBody convertToBody(ContentChunk contentBody);
-    ContentChunk convertToContentChunk(AMQBody body);
-
-    void configure();
-
-    AMQBody convertToBody(byte[] input);
+    public MessagePublishInfo convertToInfo(AMQMethodBody body);
+    public AMQMethodBody convertToBody(MessagePublishInfo info);
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Wed Jan 22 17:16:44 2014
@@ -21,13 +21,10 @@
 
 package org.apache.qpid.framing.amqp_0_9;
 
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstracti
 
 public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
-    private int _basicPublishClassId;
-    private int _basicPublishMethodId;
 
     public MethodConverter_0_9()
     {
         super((byte)0,(byte)9);
-
-
-    }
-
-    public AMQBody convertToBody(ContentChunk contentChunk)
-    {
-        if(contentChunk instanceof ContentChunk_0_9)
-        {
-            return ((ContentChunk_0_9)contentChunk).toBody();
-        }
-        else
-        {
-            return new ContentBody(contentChunk.getData());
-        }
-    }
-
-    public ContentChunk convertToContentChunk(AMQBody body)
-    {
-        final ContentBody contentBodyChunk = (ContentBody) body;
-
-        return new ContentChunk_0_9(contentBodyChunk);
-
     }
 
-    public void configure()
-    {
-
-        _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
-        _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
-    }
-
-    public AMQBody convertToBody(byte[] data)
-    {
-        return new ContentBody(data);
-    }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
     {
@@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends
 
     }
 
-    private static class ContentChunk_0_9 implements ContentChunk
-    {
-        private final ContentBody _contentBodyChunk;
-
-        public ContentChunk_0_9(final ContentBody contentBodyChunk)
-        {
-            _contentBodyChunk = contentBodyChunk;
-        }
-
-        public int getSize()
-        {
-            return _contentBodyChunk.getSize();
-        }
-
-        public byte[] getData()
-        {
-            return _contentBodyChunk.getPayload();
-        }
-
-        public void reduceToFit()
-        {
-            _contentBodyChunk.reduceBufferToFit();
-        }
-
-        public AMQBody toBody()
-        {
-            return _contentBodyChunk;
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Wed Jan 22 17:16:44 2014
@@ -21,61 +21,22 @@
 
 package org.apache.qpid.framing.amqp_0_91;
 
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 
 public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
-    private int _basicPublishClassId;
-    private int _basicPublishMethodId;
 
     public MethodConverter_0_91()
     {
         super((byte)0,(byte)9);
-
-
-    }
-
-    public AMQBody convertToBody(ContentChunk contentChunk)
-    {
-        if(contentChunk instanceof ContentChunk_0_9)
-        {
-            return ((ContentChunk_0_9)contentChunk).toBody();
-        }
-        else
-        {
-            return new ContentBody(contentChunk.getData());
-        }
-    }
-
-    public ContentChunk convertToContentChunk(AMQBody body)
-    {
-        final ContentBody contentBodyChunk = (ContentBody) body;
-
-        return new ContentChunk_0_9(contentBodyChunk);
-
     }
 
-    public void configure()
-    {
-
-        _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
-        _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
-    }
-
-    public AMQBody convertToBody(byte[] data)
-    {
-        return new ContentBody(data);
-    }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
     {
@@ -102,33 +63,4 @@ public class MethodConverter_0_91 extend
 
     }
 
-    private static class ContentChunk_0_9 implements ContentChunk
-    {
-        private final ContentBody _contentBodyChunk;
-
-        public ContentChunk_0_9(final ContentBody contentBodyChunk)
-        {
-            _contentBodyChunk = contentBodyChunk;
-        }
-
-        public int getSize()
-        {
-            return _contentBodyChunk.getSize();
-        }
-
-        public byte[] getData()
-        {
-            return _contentBodyChunk.getPayload();
-        }
-
-        public void reduceToFit()
-        {
-            _contentBodyChunk.reduceBufferToFit();
-        }
-
-        public AMQBody toBody()
-        {
-            return _contentBodyChunk;
-        }
-    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Wed Jan 22 17:16:44 2014
@@ -21,71 +21,21 @@
 
 package org.apache.qpid.framing.amqp_8_0;
 
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 
 public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
 {
-    private int _basicPublishClassId;
-    private int _basicPublishMethodId;
-
     public MethodConverter_8_0()
     {
         super((byte)8,(byte)0);
-
-
-    }
-
-    public AMQBody convertToBody(ContentChunk contentChunk)
-    {
-        return new ContentBody(contentChunk.getData());
     }
 
-    public ContentChunk convertToContentChunk(AMQBody body)
-    {
-        final ContentBody contentBodyChunk = (ContentBody) body;
-
-        return new ContentChunk()
-        {
-
-            public int getSize()
-            {
-                return contentBodyChunk.getSize();
-            }
-
-            public byte[] getData()
-            {
-                return contentBodyChunk.getPayload();
-            }
-
-            public void reduceToFit()
-            {
-                contentBodyChunk.reduceBufferToFit();
-            }
-        };
-
-    }
-
-    public void configure()
-    {
-
-        _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
-        _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
-    }
-
-    public AMQBody convertToBody(byte[] data)
-    {
-        return new ContentBody(data);
-    }
 
     public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
     {

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1560424&r1=1560423&r2=1560424&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Jan 22 17:16:44 2014
@@ -50,8 +50,6 @@ import org.apache.qpid.server.queue.AMQP
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -617,61 +615,41 @@ public class MessageStoreTest extends Qp
 
         MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
 
-        final IncomingMessage currentMessage;
-
-
-        currentMessage = new IncomingMessage(messageInfo);
-
-        currentMessage.setExchange(exchange);
-
         ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
 
-        try
-        {
-            currentMessage.setContentHeaderBody(headerBody);
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+        MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
 
-        currentMessage.setExpiration();
+        final StoredMessage<MessageMetaData> storedMessage = getVirtualHost().getMessageStore().addMessage(mmd);
+        storedMessage.flushToStore();
+        final AMQMessage currentMessage = new AMQMessage(storedMessage);
 
-        MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
-        currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
-        currentMessage.getStoredMessage().flushToStore();
-        currentMessage.route();
+        final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage);
 
 
-        // check and deliver if header says body length is zero
-        if (currentMessage.allContentReceived())
-        {
-            ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
-            final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
-            trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
-                public void postCommit()
-                {
-                    try
-                    {
-                        AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+        ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
 
-                        for(BaseQueue queue : destinationQueues)
-                        {
-                            queue.enqueue(message);
-                        }
-                    }
-                    catch (AMQException e)
+        trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
+            public void postCommit()
+            {
+                try
+                {
+                    for(BaseQueue queue : destinationQueues)
                     {
-                        _logger.error("Problem enqueing message", e);
+                        queue.enqueue(currentMessage);
                     }
                 }
-
-                public void onRollback()
+                catch (AMQException e)
                 {
-                    //To change body of implemented methods use File | Settings | File Templates.
+                    _logger.error("Problem enqueing message", e);
                 }
-            });
-        }
+            }
+
+            public void onRollback()
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+        });
+
     }
 
     private void createAllQueues()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org