You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/28 15:10:34 UTC
svn commit: r1698333 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache...
Author: rgodfrey
Date: Fri Aug 28 13:10:34 2015
New Revision: 1698333
URL: http://svn.apache.org/r1698333
Log:
QPID-6712 : Applied tidied up version of WIP patch, with changes for QPID-6713 removed
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Aug 28 13:10:34 2015
@@ -1065,15 +1065,22 @@ public abstract class AbstractBDBMessage
@Override
public synchronized T getMetaData()
{
- T metaData = _messageDataRef.getMetaData();
-
- if(metaData == null)
+ if (_messageDataRef == null)
{
- checkMessageStoreOpen();
- metaData = (T) getMessageMetaData(_messageId);
- _messageDataRef = new MessageDataSoftRef<>(metaData,null);
+ return null;
+ }
+ else
+ {
+ T metaData = _messageDataRef.getMetaData();
+
+ if (metaData == null)
+ {
+ checkMessageStoreOpen();
+ metaData = (T) getMessageMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, null);
+ }
+ return metaData;
}
- return metaData;
}
@Override
@@ -1122,7 +1129,7 @@ public abstract class AbstractBDBMessage
private Collection<QpidByteBuffer> getContentAsByteBuffer()
{
- Collection<QpidByteBuffer> data = _messageDataRef.getData();
+ Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1215,23 +1222,26 @@ public abstract class AbstractBDBMessage
synchronized FutureResult flushToStore()
{
- if(!stored())
+ if (_messageDataRef != null)
{
- checkMessageStoreOpen();
-
- Transaction txn;
- try
- {
- txn = getEnvironmentFacade().beginTransaction(null);
- }
- catch (RuntimeException e)
+ if (!stored())
{
- throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
- }
- store(txn);
- getEnvironmentFacade().commit(txn, true);
+ checkMessageStoreOpen();
- storedSizeChangeOccurred(getMetaData().getContentSize());
+ Transaction txn;
+ try
+ {
+ txn = getEnvironmentFacade().beginTransaction(null);
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("failed to begin transaction", e);
+ }
+ store(txn);
+ getEnvironmentFacade().commit(txn, true);
+
+ storedSizeChangeOccurred(getMetaData().getContentSize());
+ }
}
return FutureResult.IMMEDIATE_FUTURE;
}
@@ -1244,8 +1254,11 @@ public abstract class AbstractBDBMessage
final T metaData = getMetaData();
int delta = metaData.getContentSize();
- removeMessage(_messageId, false);
- storedSizeChangeOccurred(-delta);
+ if(stored())
+ {
+ removeMessage(_messageId, false);
+ storedSizeChangeOccurred(-delta);
+ }
if(data != null)
{
_messageDataRef.setData(null);
@@ -1255,17 +1268,18 @@ public abstract class AbstractBDBMessage
}
}
metaData.dispose();
+ _messageDataRef = null;
}
@Override
public synchronized boolean isInMemory()
{
- return _messageDataRef.isHardRef() || _messageDataRef.getData() != null;
+ return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
}
private boolean stored()
{
- return !_messageDataRef.isHardRef();
+ return _messageDataRef != null && !_messageDataRef.isHardRef();
}
@Override
@@ -1273,7 +1287,7 @@ public abstract class AbstractBDBMessage
{
flushToStore();
- if(!_messageDataRef.isHardRef())
+ if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
((MessageDataSoftRef)_messageDataRef).clear();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Fri Aug 28 13:10:34 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -33,6 +32,8 @@ import org.apache.qpid.transport.Header;
public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
{
+ private final static MessageMetaData_0_10 DELETED_MESSAGE_METADATA = new MessageMetaData_0_10(null, 0, 0);
+
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
{
super(storeMessage, connectionRef);
@@ -40,7 +41,9 @@ public class MessageTransferMessage exte
private MessageMetaData_0_10 getMetaData()
{
- return getStoredMessage().getMetaData();
+ MessageMetaData_0_10 metaData = getStoredMessage().getMetaData();
+
+ return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
}
public String getInitialRoutingAddress()
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Fri Aug 28 13:10:34 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -32,6 +33,7 @@ import org.apache.qpid.server.store.Stor
*/
public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
{
+ private static final MessageMetaData DELETED_MESSAGE_METADATA = new MessageMetaData(new MessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties()), 0);
private final long _size;
public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -47,7 +49,9 @@ public class AMQMessage extends Abstract
public MessageMetaData getMessageMetaData()
{
- return getStoredMessage().getMetaData();
+ MessageMetaData metaData = getStoredMessage().getMetaData();
+
+ return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
}
public ContentHeaderBody getContentHeaderBody()
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Aug 28 13:10:34 2015
@@ -41,7 +41,6 @@ import org.apache.qpid.server.plugin.Mes
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ByteBufferOutputStream;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
@@ -155,7 +154,6 @@ public class MessageMetaData implements
public synchronized void dispose()
{
_contentHeaderBody.dispose();
- _contentHeaderBody = null;
}
public synchronized void clearEncodedForm()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Aug 28 13:10:34 2015
@@ -22,19 +22,26 @@ package org.apache.qpid.server.protocol.
import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.util.ByteBufferUtils;
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
+ private static final AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+ public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
+
private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
private long _arrivalTime;
private final long _size;
@@ -93,7 +100,8 @@ public class Message_1_0 extends Abstrac
private MessageMetaData_1_0 getMessageMetaData()
{
- return getStoredMessage().getMetaData();
+ MessageMetaData_1_0 metaData = getStoredMessage().getMetaData();
+ return metaData == null ? DELETED_MESSAGE_METADATA : metaData;
}
public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Aug 28 13:10:34 2015
@@ -956,7 +956,6 @@ public class BasicContentHeaderPropertie
if(_headers != null)
{
_headers.dispose();
- _headers = null;
}
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Aug 28 13:10:34 2015
@@ -39,7 +39,7 @@ public class ContentHeaderBody implement
private long _bodySize;
/** must never be null */
- private BasicContentHeaderProperties _properties;
+ private final BasicContentHeaderProperties _properties;
public ContentHeaderBody(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
@@ -132,11 +132,6 @@ public class ContentHeaderBody implement
return _properties;
}
- public void setProperties(BasicContentHeaderProperties props)
- {
- _properties = props;
- }
-
@Override
public String toString()
{
@@ -199,7 +194,6 @@ public class ContentHeaderBody implement
public void dispose()
{
_properties.dispose();
- _properties = null;
}
public void clearEncodedForm()
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1698333&r1=1698332&r2=1698333&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Aug 28 13:10:34 2015
@@ -938,12 +938,43 @@ public class FieldTable
return map;
}
+ public void clearEncodedForm()
+ {
+ synchronized (this)
+ {
+ if (_properties == null)
+ {
+ if (_encodedForm != null)
+ {
+ populateFromBuffer();
+ }
+ }
+
+ if (_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ _encodedForm = null;
+ }
+ }
+ }
+
public void dispose()
{
- if(_encodedForm != null)
+ synchronized (this)
{
- _encodedForm.dispose();
- _encodedForm = null;
+ if (_properties == null)
+ {
+ if (_encodedForm != null)
+ {
+ _properties = new LinkedHashMap<>();
+ }
+ }
+
+ if (_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ _encodedForm = null;
+ }
}
}
@@ -1079,25 +1110,7 @@ public class FieldTable
_encodedSize = 0;
}
- public void clearEncodedForm()
- {
- synchronized (this)
- {
- if (_properties == null)
- {
- if (_encodedForm != null)
- {
- populateFromBuffer();
- }
- }
- if (_encodedForm != null)
- {
- _encodedForm.dispose();
- _encodedForm = null;
- }
- }
- }
public Set<AMQShortString> keySet()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org