You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/06 13:05:06 UTC
svn commit: r1616155 - in /qpid/trunk/qpid/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/message/internal/ br...
Author: rgodfrey
Date: Wed Aug 6 11:05:05 2014
New Revision: 1616155
URL: http://svn.apache.org/r1616155
Log:
QPID-5965 : [Java Broker] flow transient messages to disk in low memory situations
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Wed Aug 6 11:05:05 2014
@@ -1424,12 +1424,25 @@ public abstract class AbstractBDBMessage
storedSizeChangeOccurred(-delta);
}
+ @Override
+ public boolean isInMemory()
+ {
+ return _messageDataRef.isHardRef();
+ }
+
private boolean stored()
{
return !_messageDataRef.isHardRef();
}
@Override
+ public boolean flowToDisk()
+ {
+ flushToStore();
+ return true;
+ }
+
+ @Override
public String toString()
{
return this.getClass() + "[messageId=" + _messageId + "]";
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java Wed Aug 6 11:05:05 2014
@@ -338,6 +338,18 @@ public class BDBHAReplicaVirtualHost ext
}
@Override
+ public void setTargetSize(final long targetSize)
+ {
+
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return 0l;
+ }
+
+ @Override
public org.apache.qpid.server.security.SecurityManager getSecurityManager()
{
return null;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Aug 6 11:05:05 2014
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.message.internal;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -39,6 +31,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.util.ByteBufferInputStream;
+
public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
private final Object _messageBody;
@@ -239,6 +238,18 @@ public class InternalMessage extends Abs
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
catch (IOException e)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Wed Aug 6 11:05:05 2014
@@ -56,6 +56,7 @@ public interface Broker<X extends Broker
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
+ String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
String QPID_AMQP_PORT = "qpid.amqp_port";
String QPID_HTTP_PORT = "qpid.http_port";
@@ -74,6 +75,9 @@ public interface Broker<X extends Broker
@ManagedContextDefault(name = QPID_JMX_PORT)
public static final String DEFAULT_JMX_PORT_NUMBER = "9099";
+ @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
+ public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
@DerivedAttribute
String getBuildVersion();
@@ -190,4 +194,6 @@ public interface Broker<X extends Broker
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
+ void assignTargetSizes();
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Aug 6 11:05:05 2014
@@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
+ @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
+ long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l;
+
+ String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
+ @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
+ long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
+
@ManagedAttribute
Exchange getAlternateExchange();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Wed Aug 6 11:05:05 2014
@@ -172,4 +172,9 @@ public interface VirtualHost<X extends V
MessageStore getMessageStore();
String getType();
+
+ void setTargetSize(long targetSize);
+
+ long getTotalQueueDepthBytes();
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Wed Aug 6 11:05:05 2014
@@ -384,6 +384,39 @@ public class BrokerAdapter extends Abstr
return children;
}
+ @Override
+ public synchronized void assignTargetSizes()
+ {
+ long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD);
+ long totalSize = 0l;
+ Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+ Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>();
+ for(VirtualHostNode<?> vhn : vhns)
+ {
+ VirtualHost<?, ?, ?> vh = vhn.getVirtualHost();
+ if(vh != null)
+ {
+ long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
+ vhs.put(vh,totalQueueDepthBytes);
+ totalSize += totalQueueDepthBytes;
+ }
+ }
+
+ for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet())
+ {
+
+ long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize));
+ entry.getKey().setTargetSize(size);
+ }
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ assignTargetSizes();
+ }
+
public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName)
{
if (isManagementMode())
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Aug 6 11:05:05 2014
@@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQ
void completeRecovery();
void recover(ServerMessage<?> message);
+
+ void setTargetSize(long targetSize);
+
+ long getPotentialMemoryFootprint();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Aug 6 11:05:05 2014
@@ -79,6 +79,7 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -113,6 +114,8 @@ public abstract class AbstractQueue<X ex
}
};
+ private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
+
private final VirtualHostImpl _virtualHost;
private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
@@ -130,6 +133,8 @@ public abstract class AbstractQueue<X ex
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+ private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
+
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
private final AtomicLong _totalMessagesReceived = new AtomicLong();
@@ -924,6 +929,11 @@ public abstract class AbstractQueue<X ex
incrementQueueCount();
incrementQueueSize(message);
+ if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
+ {
+ message.getStoredMessage().flowToDisk();
+ }
+
_totalMessagesReceived.incrementAndGet();
if(_recovering.get())
@@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X ex
}
}
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetQueueSize.set(targetSize);
+ }
+
public long getTotalDequeuedMessages()
{
return _dequeueCount.get();
@@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X ex
{
QueueEntryIterator queueListIterator = getEntries().iterator();
+ long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+ long targetSize = _targetQueueSize.get();
+
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
@@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X ex
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
ServerMessage msg = node.getMessage();
+
if (msg != null)
{
+ totalSize += msg.getSize();
+ StoredMessage storedMessage = msg.getStoredMessage();
+ if(totalSize > targetSize && storedMessage.isInMemory())
+ {
+ storedMessage.flowToDisk();
+ }
checkForNotification(msg);
}
}
@@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X ex
}
+ @Override
+ public long getPotentialMemoryFootprint()
+ {
+ return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT),
+ getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages());
+ }
+
public long getAlertRepeatGap()
{
return _alertRepeatGap;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Wed Aug 6 11:05:05 2014
@@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessag
storedSizeChange(-delta);
}
+ @Override
+ public boolean isInMemory()
+ {
+ return _messageDataRef.isHardRef();
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ try(Connection conn = newConnection())
+ {
+ store(conn);
+ conn.commit();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ finally
+ {
+
+ }
+ return true;
+ }
+
private synchronized Runnable store(final Connection conn) throws SQLException
{
if (!stored())
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed Aug 6 11:05:05 2014
@@ -130,4 +130,16 @@ public class StoredMemoryMessage<T exten
public void remove()
{
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed Aug 6 11:05:05 2014
@@ -35,4 +35,8 @@ public interface StoredMessage<M extends
ByteBuffer getContent(int offsetInMessage, int size);
void remove();
+
+ boolean isInMemory();
+
+ boolean flowToDisk();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Aug 6 11:05:05 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFut
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -130,6 +131,8 @@ public abstract class AbstractVirtualHos
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
+ private final AtomicLong _targetSize = new AtomicLong(1024*1024);
+
private MessageStoreLogSubject _messageStoreLogSubject;
@ManagedAttributeField
@@ -847,6 +850,10 @@ public abstract class AbstractVirtualHos
public void execute()
{
+ VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+ Broker<?> broker = virtualHostNode.getParent(Broker.class);
+ broker.assignTargetSizes();
+
for (AMQQueue<?> q : getQueues())
{
if (q.getState() == State.ACTIVE)
@@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHos
}
@Override
+ public void setTargetSize(final long targetSize)
+ {
+ _targetSize.set(targetSize);
+ allocateTargetSizeToQueues();
+ }
+
+ private void allocateTargetSizeToQueues()
+ {
+ long targetSize = _targetSize.get();
+ Collection<AMQQueue<?>> queues = getQueues();
+ long totalSize = calculateTotalEnqueuedSize(queues);
+ if(targetSize > 0l)
+ {
+ for (AMQQueue<?> q : queues)
+ {
+ long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
+ * (double) targetSize);
+
+ q.setTargetSize(size);
+ }
+ }
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return calculateTotalEnqueuedSize(getQueues());
+ }
+
+ private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues)
+ {
+ long total = 0;
+ for(AMQQueue<?> queue : queues)
+ {
+ total += queue.getPotentialMemoryFootprint();
+ }
+ return total;
+ }
+
+ @Override
protected void onCreate()
{
super.onCreate();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Wed Aug 6 11:05:05 2014
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRec
entry.getValue().release();
entry.setValue(null); // free up any memory associated with the reference object
}
+ final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
getStore().visitMessages(new MessageHandler()
{
@Override
@@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRec
long messageNumber = storedMessage.getMessageNumber();
if(!_recoveredMessages.containsKey(messageNumber))
{
- _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
- storedMessage.remove();
+ messagesToDelete.add(storedMessage);
}
return messageNumber <_maxMessageId-1;
}
});
+ for(StoredMessage<?> storedMessage : messagesToDelete)
+ {
+
+ _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
+
+ messagesToDelete.clear();
_recoveredMessages.clear();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Wed Aug 6 11:05:05 2014
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -106,7 +105,19 @@ public class MessageConverter_Internal_t
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Aug 6 11:05:05 2014
@@ -31,7 +31,6 @@ import java.util.Map;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -106,7 +105,19 @@ public class MessageConverter_v0_10 impl
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Wed Aug 6 11:05:05 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -118,6 +117,18 @@ public class MessageConverter_Internal_t
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Wed Aug 6 11:05:05 2014
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import java.nio.ByteBuffer;
-
public class MockStoredMessage implements StoredMessage<MessageMetaData>
{
private long _messageId;
@@ -107,4 +106,16 @@ public class MockStoredMessage implement
public void remove()
{
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Aug 6 11:05:05 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -265,7 +264,19 @@ public abstract class MessageConverter_t
{
throw new UnsupportedOperationException();
}
- };
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
+ };
}
protected Section getBodySection(final M serverMessage, final String mimeType)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Wed Aug 6 11:05:05 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -115,6 +114,18 @@ public class MessageConverter_1_0_to_v0_
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Aug 6 11:05:05 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -36,7 +37,6 @@ import org.apache.qpid.server.plugin.Plu
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -214,6 +214,18 @@ public class MessageConverter_0_10_to_0_
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Aug 6 11:05:05 2014
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -32,7 +33,6 @@ import org.apache.qpid.server.plugin.Plu
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.DeliveryProperties;
@@ -103,6 +103,18 @@ public class MessageConverter_0_8_to_0_1
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Wed Aug 6 11:05:05 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -119,6 +118,18 @@ public class MessageConverter_1_0_to_v0_
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isInMemory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean flowToDisk()
+ {
+ return false;
+ }
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org