You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/28 12:41:15 UTC
svn commit: r1613950 - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berke...
Author: rgodfrey
Date: Mon Jul 28 10:41:14 2014
New Revision: 1613950
URL: http://svn.apache.org/r1613950
Log:
QPID-5930 : [Java Broker] Minimize memory footprint for persistent messages
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Mon Jul 28 10:41:14 2014
@@ -558,6 +558,43 @@ public abstract class AbstractBDBMessage
}
}
+ byte[] getAllContent(long messageId) throws StoreException
+ {
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ DatabaseEntry value = new DatabaseEntry();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Message Id: " + messageId + " Getting content body");
+ }
+
+ try
+ {
+
+ int written = 0;
+ OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
+ {
+ return contentTupleBinding.entryToObject(value);
+ }
+ else
+ {
+ throw new StoreException("Unable to find message with id " + messageId);
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id "
+ + messageId
+ + " to database: "
+ + e.getMessage(), e);
+ }
+ }
+
private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
{
Cursor cursor = null;
@@ -810,12 +847,12 @@ public abstract class AbstractBDBMessage
}
}
- private void recordXid(Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ private List<Runnable> recordXid(Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
+ org.apache.qpid.server.store.Transaction.Record[] enqueues,
+ org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
@@ -826,10 +863,20 @@ public abstract class AbstractBDBMessage
PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
+ List<Runnable> postActions = new ArrayList<>();
+ for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ {
+ StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
+ if(storedMessage instanceof StoredBDBMessage)
+ {
+ postActions.add(((StoredBDBMessage) storedMessage).store(txn));
+ }
+ }
try
{
getXidDb().put(txn, key, value);
+ return postActions;
}
catch (DatabaseException e)
{
@@ -1041,17 +1088,127 @@ public abstract class AbstractBDBMessage
protected abstract Logger getLogger();
- class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ static interface MessageDataRef<T extends StorableMessageMetaData>
{
+ T getMetaData();
+ byte[] getData();
+ void setData(byte[] data);
+ boolean isHardRef();
+ }
- private final long _messageId;
- private final boolean _isRecovered;
+ private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
+ {
+ private final T _metaData;
+ private byte[] _data;
+
+ private MessageDataHardRef(final T metaData)
+ {
+ _metaData = metaData;
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+ @Override
+ public byte[] getData()
+ {
+ return _data;
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ _data = data;
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return true;
+ }
+ }
+
+ private static final class MessageData<T extends StorableMessageMetaData>
+ {
private T _metaData;
- private volatile SoftReference<T> _metaDataRef;
+ private SoftReference<byte[]> _data;
- private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
+ private MessageData(final T metaData, final byte[] data)
+ {
+ _metaData = metaData;
+
+ if(data != null)
+ {
+ _data = new SoftReference<>(data);
+ }
+ }
+
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+
+ public byte[] getData()
+ {
+ return _data == null ? null : _data.get();
+ }
+
+ public void setData(final byte[] data)
+ {
+ _data = new SoftReference<>(data);
+ }
+
+
+ }
+ private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
+ {
+
+ public MessageDataSoftRef(final T metadata, byte[] data)
+ {
+ super(new MessageData<T>(metadata, data));
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ MessageData<T> ref = get();
+ return ref == null ? null : ref.getMetaData();
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ MessageData<T> ref = get();
+
+ return ref == null ? null : ref.getData();
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ MessageData<T> ref = get();
+ if(ref != null)
+ {
+ ref.setData(data);
+ }
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return false;
+ }
+ }
+
+ final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ {
+
+ private final long _messageId;
+
+ private volatile MessageDataRef<T> _messageDataRef;
StoredBDBMessage(long messageId, T metaData)
{
@@ -1061,27 +1218,28 @@ public abstract class AbstractBDBMessage
StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
{
_messageId = messageId;
- _isRecovered = isRecovered;
- if(!_isRecovered)
+ if(!isRecovered)
{
- _metaData = metaData;
+ _messageDataRef = new MessageDataHardRef<>(metaData);
+ }
+ else
+ {
+ _messageDataRef = new MessageDataSoftRef<>(metaData, null);
}
- _metaDataRef = new SoftReference<T>(metaData);
}
@Override
public T getMetaData()
{
- T metaData = _metaDataRef.get();
+ T metaData = _messageDataRef.getMetaData();
+
if(metaData == null)
{
checkMessageStoreOpen();
-
metaData = (T) getMessageMetaData(_messageId);
- _metaDataRef = new SoftReference<T>(metaData);
+ _messageDataRef = new MessageDataSoftRef<>(metaData,null);
}
-
return metaData;
}
@@ -1095,21 +1253,23 @@ public abstract class AbstractBDBMessage
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.slice();
-
- if(_data == null)
+ byte[] data = _messageDataRef.getData();
+ if(data == null)
{
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
+ data = new byte[src.remaining()];
+ src.duplicate().get(data);
+ _messageDataRef.setData(data);
}
else
{
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
+ byte[] oldData = data;
+ data = new byte[oldData.length + src.remaining()];
- System.arraycopy(oldData, 0, _data, 0, oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
+
+ System.arraycopy(oldData, 0, data, 0, oldData.length);
+ src.duplicate().get(data, oldData.length, src.remaining());
+
+ _messageDataRef.setData(data);
}
}
@@ -1117,55 +1277,116 @@ public abstract class AbstractBDBMessage
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
+ byte[] data = _messageDataRef.getData();
+ if(data == null)
{
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
+ if(stored())
+ {
+ checkMessageStoreOpen();
+ data = AbstractBDBMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ metaData = (T) getMessageMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
}
- else
- {
- checkMessageStoreOpen();
- return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
- }
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
}
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- return ByteBuffer.wrap(data,offsetInMessage,size);
- }
- else
+ byte[] data = _messageDataRef.getData();
+ if(data == null)
{
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.limit(length);
- buf.position(0);
- return buf;
+ if(stored())
+ {
+ checkMessageStoreOpen();
+ data = AbstractBDBMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ metaData = (T) getMessageMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
}
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+
}
- synchronized void store(Transaction txn)
+ synchronized Runnable store(Transaction txn)
{
if (!stored())
{
- try
+
+ AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData());
+ AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
+ _messageDataRef.getData() == null
+ ? ByteBuffer.allocate(0)
+ : ByteBuffer.wrap(_messageDataRef.getData()));
+
+
+ MessageDataRef<T> hardRef = _messageDataRef;
+ MessageDataSoftRef<T> messageDataSoftRef;
+ MessageData<T> ref;
+ do
{
- _dataRef = new SoftReference<byte[]>(_data);
- AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
+ ref = messageDataSoftRef.get();
}
- finally
+ while (ref == null);
+
+ _messageDataRef = messageDataSoftRef;
+
+ class Pointer implements Runnable
{
- _metaData = null;
- _data = null;
+ private MessageData<T> _ref;
+
+ Pointer(final MessageData<T> ref)
+ {
+ _ref = ref;
+ }
+
+ @Override
+ public void run()
+ {
+ _ref = null;
+ }
}
+ return new Pointer(ref);
+ }
+ else
+ {
+ return new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ }
+ };
}
}
@@ -1205,7 +1426,7 @@ public abstract class AbstractBDBMessage
private boolean stored()
{
- return _metaData == null || _isRecovered;
+ return !_messageDataRef.isHardRef();
}
@Override
@@ -1220,7 +1441,8 @@ public abstract class AbstractBDBMessage
{
private Transaction _txn;
private int _storeSizeIncrease;
- private final List<Runnable> _onCommitActions = new ArrayList<>();
+ private final List<Runnable> _preCommitActions = new ArrayList<>();
+ private final List<Runnable> _postCommitActions = new ArrayList<>();
private BDBTransaction() throws StoreException
{
@@ -1242,13 +1464,14 @@ public abstract class AbstractBDBMessage
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- _onCommitActions.add(new Runnable()
+ final long contentSize = storedMessage.getMetaData().getContentSize();
+ _preCommitActions.add(new Runnable()
{
@Override
public void run()
{
- storedMessage.store(_txn);
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _postCommitActions.add(storedMessage.store(_txn));
+ _storeSizeIncrease += contentSize;
}
});
@@ -1271,16 +1494,26 @@ public abstract class AbstractBDBMessage
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ doPostCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
}
private void doPreCommitActions()
{
- for(Runnable action : _onCommitActions)
+ for(Runnable action : _preCommitActions)
+ {
+ action.run();
+ }
+ _preCommitActions.clear();
+ }
+
+ private void doPostCommitActions()
+ {
+ for(Runnable action : _postCommitActions)
{
action.run();
}
- _onCommitActions.clear();
+ _postCommitActions.clear();
}
@Override
@@ -1289,14 +1522,17 @@ public abstract class AbstractBDBMessage
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ doPostCommitActions();
+ return storeFuture;
}
@Override
public void abortTran() throws StoreException
{
checkMessageStoreOpen();
- _onCommitActions.clear();
+ _preCommitActions.clear();
+ _postCommitActions.clear();
AbstractBDBMessageStore.this.abortTran(_txn);
}
@@ -1314,7 +1550,7 @@ public abstract class AbstractBDBMessage
{
checkMessageStoreOpen();
- AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues);
+ _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues));
}
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Mon Jul 28 10:41:14 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.ber
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import com.sleepycat.je.config.ConfigParam;
import com.sleepycat.je.config.EnvironmentParams;
@@ -65,16 +66,15 @@ public class StandardEnvironmentFacadeFa
private Map<String, String> buildEnvironmentConfiguration(ConfiguredObject<?> parent)
{
- final Map<String, String> context = parent.getContext();
Map<String, String> envConfigMap = new HashMap<>();
for (ConfigParam cp : EnvironmentParams.SUPPORTED_PARAMS.values())
{
final String parameterName = cp.getName();
- if (context.containsKey(parameterName) && !cp.isForReplication())
+ Set<String> contextKeys = parent.getContextKeys();
+ if (!cp.isForReplication() && contextKeys.contains(parameterName))
{
- String contextValue = context.get(parameterName);
- envConfigMap.put(parameterName, contextValue);
+ envConfigMap.put(parameterName, parent.getContextValue(String.class, parameterName));
}
}
return Collections.unmodifiableMap(envConfigMap);
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java Mon Jul 28 10:41:14 2014
@@ -126,11 +126,11 @@ public class ReplicatedEnvironmentFacade
private Map<String, String> buildConfig(ConfiguredObject<?> parent, Pattern paramName)
{
Map<String, String> targetMap = new HashMap<>();
- for (String name : parent.getContext().keySet())
+ for (String name : parent.getContextKeys())
{
if (paramName.matcher(name).matches())
{
- String contextValue = parent.getContext().get(name);
+ String contextValue = parent.getContextValue(String.class,name);
targetMap.put(name, contextValue);
}
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java Mon Jul 28 10:41:14 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualho
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.SizeMonitoringSettings;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -31,6 +32,12 @@ public interface BDBVirtualHost<X extend
String STORE_PATH = "storePath";
+ // Default the JE cache to 5% of total memory, but no less than 10Mb and no more than 200Mb
+ @ManagedContextDefault(name="je.maxMemory")
+ long DEFAULT_JE_CACHE_SIZE = Math.max(10l*1024l*1024l,
+ Math.min(200l*1024l*1024l,
+ Runtime.getRuntime().maxMemory()/20l));
+
@ManagedAttribute(mandatory = true)
String getStorePath();
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Mon Jul 28 10:41:14 2014
@@ -418,7 +418,7 @@ public class BDBHAVirtualHostNodeImpl ex
LOGGER.debug("Creating new virtualhost with name : " + getGroupName());
}
- boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
+ boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)
&& Boolean.parseBoolean(String.valueOf(getContext().get(
VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)));
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java Mon Jul 28 10:41:14 2014
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Collections;
+import java.util.Map;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
/*
@@ -60,7 +62,10 @@ public class BDBMessageStoreQuotaEventsT
protected VirtualHost createVirtualHost(String storeLocation)
{
final BDBVirtualHost parent = mock(BDBVirtualHost.class);
- when(parent.getContext()).thenReturn(Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE));
+ Map<String, String> contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
+ when(parent.getContext()).thenReturn(contextMap);
+ when(parent.getContextKeys()).thenReturn(contextMap.keySet());
+ when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE);
when(parent.getStorePath()).thenReturn(storeLocation);
when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE);
when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Mon Jul 28 10:41:14 2014
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
@@ -113,7 +113,7 @@ public abstract class AbstractServerMess
@Override
final public MessageReference<X> newReference()
{
- return new Reference();
+ return new Reference(this);
}
@Override
@@ -148,26 +148,32 @@ public abstract class AbstractServerMess
return "Message[" + debugIdentity() + "]";
}
- private final class Reference implements MessageReference<X>
+ private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
+ implements MessageReference<X>
{
- private final AtomicBoolean _released = new AtomicBoolean(false);
+ private static final AtomicIntegerFieldUpdater<Reference> _releasedUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
+
+ private AbstractServerMessageImpl<X, T> _message;
+ private volatile int _released;
- private Reference()
+ private Reference(final AbstractServerMessageImpl<X, T> message)
{
- incrementReference();
+ _message = message;
+ _message.incrementReference();
}
public X getMessage()
{
- return (X) AbstractServerMessageImpl.this;
+ return (X) _message;
}
- public void release()
+ public synchronized void release()
{
- if(!_released.getAndSet(true))
+ if(_releasedUpdater.compareAndSet(this,0,1))
{
- decrementReference();
+ _message.decrementReference();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java Mon Jul 28 10:41:14 2014
@@ -92,5 +92,7 @@ public interface InstanceProperties
return _props.get(prop);
}
}
+
+
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Mon Jul 28 10:41:14 2014
@@ -1339,6 +1339,14 @@ public abstract class AbstractConfigured
return converter.convert("${" + propertyName + "}", this);
}
+ @Override
+ public Set<String> getContextKeys()
+ {
+ Map<String,String> inheritedContext = new HashMap<>();
+ generateInheritedContext(getModel(), this, inheritedContext);
+ return Collections.unmodifiableSet(inheritedContext.keySet());
+ }
+
private OwnAttributeResolver getOwnAttributeResolver()
{
return _attributeResolver;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Mon Jul 28 10:41:14 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -78,6 +79,8 @@ public interface ConfiguredObject<X exte
<T> T getContextValue(Class<T> clazz, String propertyName);
+ Set<String> getContextKeys();
+
@DerivedAttribute( persist = true )
String getLastUpdatedBy();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 28 10:41:14 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.EnumMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -80,13 +79,17 @@ public abstract class QueueEntryImpl imp
private volatile long _entryId;
- private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties();
+ private static int REDELIVERED_FLAG = 1;
+ private static int PERSISTENT_FLAG = 2;
+ private static int MANDATORY_FLAG = 4;
+ private static int IMMEDIATE_FLAG = 8;
+ private int _flags;
+ private long _expiration;
/** Number of times this message has been delivered */
- private volatile int _deliveryCount = 0;
+ private volatile int _deliveryCount = -1;
private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
.newUpdater(QueueEntryImpl.class, "_deliveryCount");
- private boolean _deliveredToConsumer;
public QueueEntryImpl(QueueEntryList queueEntryList)
@@ -117,14 +120,17 @@ public abstract class QueueEntryImpl imp
{
if(_message != null)
{
- _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent());
- _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration());
+ if(_message.getMessage().isPersistent())
+ {
+ setPersistent();
+ }
+ _expiration = _message.getMessage().getExpiration();
}
}
public InstanceProperties getInstanceProperties()
{
- return _instanceProperties;
+ return new EntryInstanceProperties();
}
protected void setEntryId(long entryId)
@@ -154,21 +160,17 @@ public abstract class QueueEntryImpl imp
public boolean getDeliveredToConsumer()
{
- return _deliveredToConsumer;
+ return _deliveryCountUpdater.get(this) != -1;
}
public boolean expired()
{
- ServerMessage message = getMessage();
- if(message != null)
+ long expiration = _expiration;
+ if (expiration != 0L)
{
- long expiration = message.getExpiration();
- if (expiration != 0L)
- {
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
- return (now > expiration);
- }
+ return (now > expiration);
}
return false;
@@ -206,7 +208,7 @@ public abstract class QueueEntryImpl imp
final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
if(acquired)
{
- _deliveredToConsumer = true;
+ _deliveryCountUpdater.compareAndSet(this,-1,0);
}
return acquired;
}
@@ -253,15 +255,6 @@ public abstract class QueueEntryImpl imp
}
- public void setRedelivered()
- {
- _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE);
- }
-
- public boolean isRedelivered()
- {
- return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
- }
public QueueConsumer getDeliveredConsumer()
{
@@ -459,7 +452,7 @@ public abstract class QueueEntryImpl imp
public int getDeliveryCount()
{
- return _deliveryCount;
+ return _deliveryCount == -1 ? 0 : _deliveryCount;
}
@Override
@@ -470,6 +463,7 @@ public abstract class QueueEntryImpl imp
public void incrementDeliveryCount()
{
+ _deliveryCountUpdater.compareAndSet(this,-1,0);
_deliveryCountUpdater.incrementAndGet(this);
}
@@ -509,20 +503,45 @@ public abstract class QueueEntryImpl imp
return getQueue();
}
- private static class EntryInstanceProperties implements InstanceProperties
+ public void setRedelivered()
+ {
+ _flags |= REDELIVERED_FLAG;
+ }
+
+ private void setPersistent()
+ {
+ _flags |= PERSISTENT_FLAG;
+ }
+
+ public boolean isRedelivered()
+ {
+ return (_flags & REDELIVERED_FLAG) != 0;
+ }
+
+ private class EntryInstanceProperties implements InstanceProperties
{
- private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
@Override
public Object getProperty(final Property prop)
{
- return _properties.get(prop);
- }
+ switch(prop)
+ {
- private void setProperty(Property prop, Object value)
- {
- _properties.put(prop, value);
+ case REDELIVERED:
+ return (_flags & REDELIVERED_FLAG) != 0;
+ case PERSISTENT:
+ return (_flags & PERSISTENT_FLAG) != 0;
+ case MANDATORY:
+ return (_flags & MANDATORY_FLAG) != 0;
+ case IMMEDIATE:
+ return (_flags & IMMEDIATE_FLAG) != 0;
+ case EXPIRATION:
+ return _expiration;
+ default:
+ throw new IllegalArgumentException("Unknown property " + prop);
+ }
}
+
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Mon Jul 28 10:41:14 2014
@@ -716,8 +716,8 @@ public abstract class AbstractJDBCMessag
}
- private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
+ private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -738,6 +738,17 @@ public abstract class AbstractJDBCMessag
stmt.close();
}
+ List<Runnable> postActions = new ArrayList<>();
+ for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ {
+ StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
+ if(storedMessage instanceof StoredJDBCMessage)
+ {
+ postActions.add(((StoredJDBCMessage) storedMessage).store(conn));
+ }
+ }
+
+
stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
try
@@ -773,7 +784,7 @@ public abstract class AbstractJDBCMessag
{
stmt.close();
}
-
+ return postActions;
}
catch (SQLException e)
{
@@ -1105,6 +1116,47 @@ public abstract class AbstractJDBCMessag
}
+
+ private byte[] getAllContent(long messageId)
+ {
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+
+ int written = 0;
+
+ if (rs.next())
+ {
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ return dataAsBytes;
+ }
+
+ throw new StoreException("No such message, id: " + messageId);
+
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+
+
+ }
+
+
+
@Override
public boolean isPersistent()
{
@@ -1116,7 +1168,8 @@ public abstract class AbstractJDBCMessag
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
- private final List<Runnable> _onCommitActions = new ArrayList<>();
+ private final List<Runnable> _preCommitActions = new ArrayList<>();
+ private final List<Runnable> _postCommitActions = new ArrayList<>();
protected JDBCTransaction()
{
@@ -1138,19 +1191,20 @@ public abstract class AbstractJDBCMessag
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- _onCommitActions.add(new Runnable()
+ _preCommitActions.add(new Runnable()
{
@Override
public void run()
{
try
{
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _postCommitActions.add(((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()));
_storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
catch (SQLException e)
{
- throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId,
+ e);
}
}
});
@@ -1174,6 +1228,7 @@ public abstract class AbstractJDBCMessag
doPreCommitActions();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
+ doPostCommitActions();
}
@Override
@@ -1183,23 +1238,33 @@ public abstract class AbstractJDBCMessag
doPreCommitActions();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
+ doPostCommitActions();
return storeFuture;
}
private void doPreCommitActions()
{
- for(Runnable action : _onCommitActions)
+ for(Runnable action : _preCommitActions)
+ {
+ action.run();
+ }
+ _preCommitActions.clear();
+ }
+
+ private void doPostCommitActions()
+ {
+ for(Runnable action : _postCommitActions)
{
action.run();
}
- _onCommitActions.clear();
+ _postCommitActions.clear();
}
@Override
public void abortTran()
{
checkMessageStoreOpen();
- _onCommitActions.clear();
+ _preCommitActions.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@@ -1216,56 +1281,171 @@ public abstract class AbstractJDBCMessag
{
checkMessageStoreOpen();
- AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues));
}
}
- private class StoredJDBCMessage implements StoredMessage
+
+ static interface MessageDataRef<T extends StorableMessageMetaData>
{
+ T getMetaData();
+ byte[] getData();
+ void setData(byte[] data);
+ boolean isHardRef();
+ }
- private final long _messageId;
- private final boolean _isRecovered;
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
+ {
+ private final T _metaData;
private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
+
+ private MessageDataHardRef(final T metaData)
+ {
+ _metaData = metaData;
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ return _data;
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ _data = data;
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return true;
+ }
+ }
+
+ private static final class MessageData<T extends StorableMessageMetaData>
+ {
+ private T _metaData;
+ private SoftReference<byte[]> _data;
+
+ private MessageData(final T metaData, final byte[] data)
+ {
+ _metaData = metaData;
+
+ if(data != null)
+ {
+ _data = new SoftReference<>(data);
+ }
+ }
+
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+
+ public byte[] getData()
+ {
+ return _data == null ? null : _data.get();
+ }
+
+ public void setData(final byte[] data)
+ {
+ _data = new SoftReference<>(data);
+ }
+
+
+ }
+ private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
+ {
+
+ public MessageDataSoftRef(final T metadata, byte[] data)
+ {
+ super(new MessageData<T>(metadata, data));
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ MessageData<T> ref = get();
+ return ref == null ? null : ref.getMetaData();
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ MessageData<T> ref = get();
+
+ return ref == null ? null : ref.getData();
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ MessageData<T> ref = get();
+ if(ref != null)
+ {
+ ref.setData(data);
+ }
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return false;
+ }
+ }
+
+ private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ {
+
+ private final long _messageId;
+
+ private volatile MessageDataRef<T> _messageDataRef;
- StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
+ StoredJDBCMessage(long messageId, T metaData)
{
this(messageId, metaData, false);
}
StoredJDBCMessage(long messageId,
- StorableMessageMetaData metaData, boolean isRecovered)
+ T metaData, boolean isRecovered)
{
_messageId = messageId;
- _isRecovered = isRecovered;
- if(!_isRecovered)
+ if(!isRecovered)
{
- _metaData = metaData;
+ _messageDataRef = new MessageDataHardRef<>(metaData);
+ }
+ else
+ {
+ _messageDataRef = new MessageDataSoftRef<>(metaData, null);
}
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
- public StorableMessageMetaData getMetaData()
+ public T getMetaData()
{
- StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
+ T metaData = _messageDataRef.getMetaData();
if(metaData == null)
{
checkMessageStoreOpen();
try
{
- metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData,null);
}
catch (SQLException e)
{
throw new StoreException(e);
}
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
@@ -1281,21 +1461,23 @@ public abstract class AbstractJDBCMessag
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.slice();
+ byte[] data = _messageDataRef.getData();
- if(_data == null)
+ if(data == null)
{
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
+ data = new byte[src.remaining()];
+ src.duplicate().get(data);
+ _messageDataRef.setData(data);
}
else
{
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
+ byte[] oldData = data;
+ data = new byte[oldData.length + src.remaining()];
+
+ System.arraycopy(oldData,0,data,0,oldData.length);
+ src.duplicate().get(data, oldData.length, src.remaining());
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
+ _messageDataRef.setData(data);
}
}
@@ -1303,34 +1485,90 @@ public abstract class AbstractJDBCMessag
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
+ byte[] data = _messageDataRef.getData();
+
+ if(data == null)
{
- checkMessageStoreOpen();
- return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ if(stored())
+ {
+ checkMessageStoreOpen();
+ getLogger().debug("GET CONTENT for message id " + _messageId);
+ data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ try
+ {
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
}
+
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+
}
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.position(0);
- buf.limit(length);
- return buf;
+ byte[] data = _messageDataRef.getData();
+
+ if(data == null)
+ {
+
+ if(stored())
+ {
+ checkMessageStoreOpen();
+
+ data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ try
+ {
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+
}
@Override
public void remove()
{
+ getLogger().debug("REMOVE called on message: " + _messageId);
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
@@ -1338,32 +1576,69 @@ public abstract class AbstractJDBCMessag
storedSizeChange(-delta);
}
- private synchronized void store(final Connection conn) throws SQLException
+ private synchronized Runnable store(final Connection conn) throws SQLException
{
if (!stored())
{
- try
+ getLogger().debug("STORING message id " + _messageId);
+ storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
+ AbstractJDBCMessageStore.this.addContent(conn, _messageId,
+ _messageDataRef.getData() == null
+ ? ByteBuffer.allocate(0)
+ : ByteBuffer.wrap(_messageDataRef.getData()));
+
+
+ if(getLogger().isDebugEnabled())
{
- storeMetaData(conn, _messageId, _metaData);
- AbstractJDBCMessageStore.this.addContent(conn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ getLogger().debug("Storing message " + _messageId + " to store");
}
- finally
+
+ MessageDataRef<T> hardRef = _messageDataRef;
+ MessageDataSoftRef<T> messageDataSoftRef;
+ MessageData<T> ref;
+ do
{
- _metaData = null;
- _data = null;
+ messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
+ ref = messageDataSoftRef.get();
}
+ while (ref == null);
- if(getLogger().isDebugEnabled())
+ _messageDataRef = messageDataSoftRef;
+
+ class Pointer implements Runnable
{
- getLogger().debug("Storing message " + _messageId + " to store");
+ private MessageData<T> _ref;
+
+ Pointer(final MessageData<T> ref)
+ {
+ getLogger().debug("POST COMMIT for message id " + _messageId);
+ _ref = ref;
+ }
+
+ @Override
+ public void run()
+ {
+ _ref = null;
+ }
}
+ return new Pointer(ref);
+ }
+ else
+ {
+ return new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ }
+ };
}
}
private boolean stored()
{
- return _metaData == null || _isRecovered;
+ return !_messageDataRef.isHardRef();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java Mon Jul 28 10:41:14 2014
@@ -90,7 +90,7 @@ public abstract class AbstractStandardVi
if (host == null)
{
- boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
+ boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)
&& Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)));
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Jul 28 10:41:14 2014
@@ -1249,8 +1249,6 @@ public class AMQChannel<T extends AMQPro
final BasicContentHeaderProperties properties =
incomingMessage.getContentHeader().getProperties();
- long expiration = properties.getExpiration();
- message.setExpiration(expiration);
return message;
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Mon Jul 28 10:41:14 2014
@@ -29,8 +29,6 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
-import java.nio.ByteBuffer;
-
/**
* A deliverable message.
*/
@@ -39,10 +37,6 @@ public class AMQMessage extends Abstract
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private long _expiration;
-
private final long _size;
public AMQMessage(StoredMessage<MessageMetaData> handle)
@@ -56,11 +50,6 @@ public class AMQMessage extends Abstract
_size = handle.getMetaData().getContentSize();
}
- public void setExpiration(final long expiration)
- {
- _expiration = expiration;
- }
-
public MessageMetaData getMessageMetaData()
{
return getStoredMessage().getMetaData();
@@ -110,16 +99,14 @@ public class AMQMessage extends Abstract
return getMessagePublishInfo().isImmediate();
}
-
public boolean isMandatory()
{
return getMessagePublishInfo().isMandatory();
}
-
public long getExpiration()
{
- return _expiration;
+ return getMessageHeader().getExpiration();
}
Modified: qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java Mon Jul 28 10:41:14 2014
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.derby;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Collections;
import java.util.Map;
@@ -28,9 +31,6 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
import org.apache.qpid.server.virtualhost.derby.DerbyVirtualHost;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10;
@@ -54,6 +54,7 @@ public class DerbyMessageStoreQuotaEvent
{
final DerbyVirtualHost parent = mock(DerbyVirtualHost.class);
when(parent.getContext()).thenReturn(createContextSettings());
+ when(parent.getContextKeys()).thenReturn(Collections.emptySet());
when(parent.getStorePath()).thenReturn(storeLocation);
when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE);
when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE);
Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java Mon Jul 28 10:41:14 2014
@@ -27,6 +27,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
@@ -84,7 +85,7 @@ public class GenericJDBCConfigurationSto
JDBCSettings settings = (JDBCSettings)parent;
_connectionURL = settings.getConnectionUrl();
- JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent.getContext());
+ JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent);
if (!details.isKnownVendor() && getLogger().isInfoEnabled())
{
@@ -111,8 +112,13 @@ public class GenericJDBCConfigurationSto
try
{
- Map<String, String> providerAttributes = new HashMap(_parent.getContext());
- providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames());
+ Map<String, String> providerAttributes = new HashMap<>();
+ Set<String> providerAttributeNames = connectionProviderFactory.getProviderAttributeNames();
+ providerAttributeNames.retainAll(parent.getContextKeys());
+ for(String attr : providerAttributeNames)
+ {
+ providerAttributes.put(attr, parent.getContextValue(String.class, attr));
+ }
_connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL,
settings.getUsername(),
Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java Mon Jul 28 10:41:14 2014
@@ -28,6 +28,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.Subject;
import org.apache.log4j.Logger;
@@ -36,8 +39,6 @@ import org.apache.qpid.server.plugin.JDB
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.StoreException;
-import javax.security.auth.Subject;
-
/**
* Implementation of a MessageStore backed by a Generic JDBC Database.
*/
@@ -60,7 +61,7 @@ public class GenericJDBCMessageStore ext
JDBCSettings settings = (JDBCSettings)parent;
_connectionURL = settings.getConnectionUrl();
- JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent.getContext());
+ JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent);
if (!details.isKnownVendor() && getLogger().isInfoEnabled())
{
@@ -90,9 +91,13 @@ public class GenericJDBCMessageStore ext
try
{
- Map<String, String> providerAttributes = new HashMap(parent.getContext());
- providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames());
-
+ Map<String, String> providerAttributes = new HashMap<>();
+ Set<String> providerAttributeNames = connectionProviderFactory.getProviderAttributeNames();
+ providerAttributeNames.retainAll(parent.getContextKeys());
+ for(String attr : providerAttributeNames)
+ {
+ providerAttributes.put(attr, parent.getContextValue(String.class, attr));
+ }
_connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL,
settings.getUsername(),
Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java?rev=1613950&r1=1613949&r2=1613950&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java Mon Jul 28 10:41:14 2014
@@ -19,9 +19,15 @@
package org.apache.qpid.server.store.jdbc;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
+
+import org.apache.qpid.server.model.ConfiguredObject;
public abstract class JDBCDetails
{
@@ -216,7 +222,75 @@ public abstract class JDBCDetails
return result;
}
-
+ public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final ConfiguredObject<?> object)
+ {
+ final Set<String> contextKeys = object.getContextKeys();
+ Map<String,String> mapConversion = new AbstractMap<String, String>()
+ {
+ @Override
+ public Set<Entry<String, String>> entrySet()
+ {
+ return new AbstractSet<Entry<String, String>>()
+ {
+ @Override
+ public Iterator<Entry<String, String>> iterator()
+ {
+ final Iterator<String> underlying = contextKeys.iterator();
+ return new Iterator<Entry<String, String>>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return underlying.hasNext();
+ }
+
+ @Override
+ public Entry<String, String> next()
+ {
+ final String key = underlying.next();
+ final String value = object.getContextValue(String.class, key);
+ return new Entry<String,String>()
+ {
+
+ @Override
+ public String getKey()
+ {
+ return key;
+ }
+
+ @Override
+ public String getValue()
+ {
+ return value;
+ }
+
+ @Override
+ public String setValue(final String value)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public int size()
+ {
+ return contextKeys.size();
+ }
+ };
+ }
+ };
+ return getDetailsForJdbcUrl(jdbcUrl, mapConversion);
+ }
public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final Map<String, String> contextMap)
{
String[] components = jdbcUrl.split(":", 3);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org