You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/28 15:10:34 UTC

svn commit: r1698333 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache...

Author: rgodfrey
Date: Fri Aug 28 13:10:34 2015
New Revision: 1698333

URL: http://svn.apache.org/r1698333
Log:
QPID-6712 : Applied tidied up version of WIP patch, with changes for QPID-6713 removed

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Aug 28 13:10:34 2015
@@ -1065,15 +1065,22 @@ public abstract class AbstractBDBMessage
         @Override
         public synchronized T getMetaData()
         {
-            T metaData = _messageDataRef.getMetaData();
-
-            if(metaData == null)
+            if (_messageDataRef == null)
             {
-                checkMessageStoreOpen();
-                metaData = (T) getMessageMetaData(_messageId);
-                _messageDataRef = new MessageDataSoftRef<>(metaData,null);
+                return null;
+            }
+            else
+            {
+                T metaData = _messageDataRef.getMetaData();
+
+                if (metaData == null)
+                {
+                    checkMessageStoreOpen();
+                    metaData = (T) getMessageMetaData(_messageId);
+                    _messageDataRef = new MessageDataSoftRef<>(metaData, null);
+                }
+                return metaData;
             }
-            return metaData;
         }
 
         @Override
@@ -1122,7 +1129,7 @@ public abstract class AbstractBDBMessage
 
         private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
-            Collection<QpidByteBuffer> data = _messageDataRef.getData();
+            Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1215,23 +1222,26 @@ public abstract class AbstractBDBMessage
 
         synchronized FutureResult flushToStore()
         {
-            if(!stored())
+            if (_messageDataRef != null)
             {
-                checkMessageStoreOpen();
-
-                Transaction txn;
-                try
-                {
-                    txn = getEnvironmentFacade().beginTransaction(null);
-                }
-                catch (RuntimeException e)
+                if (!stored())
                 {
-                    throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
-                }
-                store(txn);
-                getEnvironmentFacade().commit(txn, true);
+                    checkMessageStoreOpen();
 
-                storedSizeChangeOccurred(getMetaData().getContentSize());
+                    Transaction txn;
+                    try
+                    {
+                        txn = getEnvironmentFacade().beginTransaction(null);
+                    }
+                    catch (RuntimeException e)
+                    {
+                        throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
+                    }
+                    store(txn);
+                    getEnvironmentFacade().commit(txn, true);
+
+                    storedSizeChangeOccurred(getMetaData().getContentSize());
+                }
             }
             return FutureResult.IMMEDIATE_FUTURE;
         }
@@ -1244,8 +1254,11 @@ public abstract class AbstractBDBMessage
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
-            removeMessage(_messageId, false);
-            storedSizeChangeOccurred(-delta);
+            if(stored())
+            {
+                removeMessage(_messageId, false);
+                storedSizeChangeOccurred(-delta);
+            }
             if(data != null)
             {
                 _messageDataRef.setData(null);
@@ -1255,17 +1268,18 @@ public abstract class AbstractBDBMessage
                 }
             }
             metaData.dispose();
+            _messageDataRef = null;
         }
 
         @Override
         public synchronized boolean isInMemory()
         {
-            return _messageDataRef.isHardRef() || _messageDataRef.getData() != null;
+            return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
         }
 
         private boolean stored()
         {
-            return !_messageDataRef.isHardRef();
+            return _messageDataRef != null && !_messageDataRef.isHardRef();
         }
 
         @Override
@@ -1273,7 +1287,7 @@ public abstract class AbstractBDBMessage
         {
 
             flushToStore();
-            if(!_messageDataRef.isHardRef())
+            if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
                 ((MessageDataSoftRef)_messageDataRef).clear();
             }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Fri Aug 28 13:10:34 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -33,6 +32,8 @@ import org.apache.qpid.transport.Header;
 public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
 {
 
+    private final static MessageMetaData_0_10 DELETED_MESSAGE_METADATA = new MessageMetaData_0_10(null, 0, 0);
+
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
     {
         super(storeMessage, connectionRef);
@@ -40,7 +41,9 @@ public class MessageTransferMessage exte
 
     private MessageMetaData_0_10 getMetaData()
     {
-        return getStoredMessage().getMetaData();
+        MessageMetaData_0_10 metaData = getStoredMessage().getMetaData();
+
+        return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
     }
 
     public String getInitialRoutingAddress()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Fri Aug 28 13:10:34 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -32,6 +33,7 @@ import org.apache.qpid.server.store.Stor
  */
 public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
 {
+    private static final MessageMetaData DELETED_MESSAGE_METADATA = new MessageMetaData(new MessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties()), 0);
     private final long _size;
 
     public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -47,7 +49,9 @@ public class AMQMessage extends Abstract
 
     public MessageMetaData getMessageMetaData()
     {
-        return getStoredMessage().getMetaData();
+        MessageMetaData metaData = getStoredMessage().getMetaData();
+
+        return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
     }
 
     public ContentHeaderBody getContentHeaderBody()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Aug 28 13:10:34 2015
@@ -41,7 +41,6 @@ import org.apache.qpid.server.plugin.Mes
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.util.ByteBufferOutputStream;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
 
 /**
  * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
@@ -155,7 +154,6 @@ public class MessageMetaData implements
     public synchronized void dispose()
     {
         _contentHeaderBody.dispose();
-        _contentHeaderBody = null;
     }
 
     public synchronized void clearEncodedForm()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Aug 28 13:10:34 2015
@@ -22,19 +22,26 @@ package org.apache.qpid.server.protocol.
 
 
 import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
 
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.util.ByteBufferUtils;
 
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
 
+    private static final AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer();
+    public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
+
     private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
     private long _arrivalTime;
     private final long _size;
@@ -93,7 +100,8 @@ public class Message_1_0 extends Abstrac
 
     private MessageMetaData_1_0 getMessageMetaData()
     {
-        return getStoredMessage().getMetaData();
+        MessageMetaData_1_0 metaData = getStoredMessage().getMetaData();
+        return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
     }
 
     public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Aug 28 13:10:34 2015
@@ -956,7 +956,6 @@ public class BasicContentHeaderPropertie
         if(_headers != null)
         {
             _headers.dispose();
-            _headers = null;
         }
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 28 13:10:34 2015
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
     private long _bodySize;
 
     /** must never be null */
-    private BasicContentHeaderProperties _properties;
+    private final BasicContentHeaderProperties _properties;
 
     public ContentHeaderBody(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException
     {
@@ -132,11 +132,6 @@ public class ContentHeaderBody implement
         return _properties;
     }
 
-    public void setProperties(BasicContentHeaderProperties props)
-    {
-        _properties = props;
-    }
-
     @Override
     public String toString()
     {
@@ -199,7 +194,6 @@ public class ContentHeaderBody implement
     public void dispose()
     {
         _properties.dispose();
-        _properties = null;
     }
 
     public void clearEncodedForm()

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Aug 28 13:10:34 2015
@@ -938,12 +938,43 @@ public class FieldTable
         return map;
     }
 
+    public void clearEncodedForm()
+    {
+        synchronized (this)
+        {
+            if (_properties == null)
+            {
+                if (_encodedForm != null)
+                {
+                    populateFromBuffer();
+                }
+            }
+
+            if (_encodedForm != null)
+            {
+                _encodedForm.dispose();
+                _encodedForm = null;
+            }
+        }
+    }
+
     public void dispose()
     {
-        if(_encodedForm != null)
+        synchronized (this)
         {
-            _encodedForm.dispose();
-            _encodedForm = null;
+            if (_properties == null)
+            {
+                if (_encodedForm != null)
+                {
+                    _properties = new LinkedHashMap<>();
+                }
+            }
+
+            if (_encodedForm != null)
+            {
+                _encodedForm.dispose();
+                _encodedForm = null;
+            }
         }
     }
 
@@ -1079,25 +1110,7 @@ public class FieldTable
         _encodedSize = 0;
     }
 
-    public void clearEncodedForm()
-    {
-        synchronized (this)
-        {
-            if (_properties == null)
-            {
-                if (_encodedForm != null)
-                {
-                    populateFromBuffer();
-                }
-            }
 
-            if (_encodedForm != null)
-            {
-                _encodedForm.dispose();
-                _encodedForm = null;
-            }
-        }
-    }
 
     public Set<AMQShortString> keySet()
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org