You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/06 13:05:06 UTC

svn commit: r1616155 - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ br...

Author: rgodfrey
Date: Wed Aug  6 11:05:05 2014
New Revision: 1616155

URL: http://svn.apache.org/r1616155
Log:
QPID-5965 : [Java Broker] flow transient messages to disk in low memory situations

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Wed Aug  6 11:05:05 2014
@@ -1424,12 +1424,25 @@ public abstract class AbstractBDBMessage
             storedSizeChangeOccurred(-delta);
         }
 
+        @Override
+        public boolean isInMemory()
+        {
+            return _messageDataRef.isHardRef();
+        }
+
         private boolean stored()
         {
             return !_messageDataRef.isHardRef();
         }
 
         @Override
+        public boolean flowToDisk()
+        {
+            flushToStore();
+            return true;
+        }
+
+        @Override
         public String toString()
         {
             return this.getClass() + "[messageId=" + _messageId + "]";

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java Wed Aug  6 11:05:05 2014
@@ -338,6 +338,18 @@ public class BDBHAReplicaVirtualHost ext
     }
 
     @Override
+    public void setTargetSize(final long targetSize)
+    {
+
+    }
+
+    @Override
+    public long getTotalQueueDepthBytes()
+    {
+        return 0l;
+    }
+
+    @Override
     public org.apache.qpid.server.security.SecurityManager getSecurityManager()
     {
         return null;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Wed Aug  6 11:05:05 2014
@@ -20,14 +20,6 @@
  */
 package org.apache.qpid.server.message.internal;
 
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.util.ByteBufferInputStream;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -39,6 +31,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.util.ByteBufferInputStream;
+
 public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
 {
     private final Object _messageBody;
@@ -239,6 +238,18 @@ public class InternalMessage extends Abs
                 {
                     throw new UnsupportedOperationException();
                 }
+
+                @Override
+                public boolean isInMemory()
+                {
+                    return true;
+                }
+
+                @Override
+                public boolean flowToDisk()
+                {
+                    return false;
+                }
             };
         }
         catch (IOException e)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Wed Aug  6 11:05:05 2014
@@ -56,6 +56,7 @@ public interface Broker<X extends Broker
     String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
     String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
 
+    String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold";
 
     String QPID_AMQP_PORT = "qpid.amqp_port";
     String QPID_HTTP_PORT = "qpid.http_port";
@@ -74,6 +75,9 @@ public interface Broker<X extends Broker
     @ManagedContextDefault(name = QPID_JMX_PORT)
     public static final String DEFAULT_JMX_PORT_NUMBER  = "9099";
 
+    @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
+    public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
     @DerivedAttribute
     String getBuildVersion();
 
@@ -190,4 +194,6 @@ public interface Broker<X extends Broker
 
     AuthenticationProvider<?> getManagementModeAuthenticationProvider();
 
+    void assignTargetSizes();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Aug  6 11:05:05 2014
@@ -49,6 +49,14 @@ public interface Queue<X extends Queue<X
     String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
     String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
 
+    String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
+    @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
+    long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l;
+
+    String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead";
+    @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD)
+    long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l;
+
     @ManagedAttribute
     Exchange getAlternateExchange();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Wed Aug  6 11:05:05 2014
@@ -172,4 +172,9 @@ public interface VirtualHost<X extends V
     MessageStore getMessageStore();
 
     String getType();
+
+    void setTargetSize(long targetSize);
+
+    long getTotalQueueDepthBytes();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Wed Aug  6 11:05:05 2014
@@ -384,6 +384,39 @@ public class BrokerAdapter extends Abstr
         return children;
     }
 
+    @Override
+    public synchronized void assignTargetSizes()
+    {
+        long totalTarget  = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD);
+        long totalSize = 0l;
+        Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+        Map<VirtualHost<?,?,?>,Long> vhs = new HashMap<>();
+        for(VirtualHostNode<?> vhn : vhns)
+        {
+            VirtualHost<?, ?, ?> vh = vhn.getVirtualHost();
+            if(vh != null)
+            {
+                long totalQueueDepthBytes = vh.getTotalQueueDepthBytes();
+                vhs.put(vh,totalQueueDepthBytes);
+                totalSize += totalQueueDepthBytes;
+            }
+        }
+
+        for(Map.Entry<VirtualHost<?, ?, ?>,Long> entry : vhs.entrySet())
+        {
+
+            long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize));
+            entry.getKey().setTargetSize(size);
+        }
+    }
+
+    @Override
+    protected void onOpen()
+    {
+        super.onOpen();
+        assignTargetSizes();
+    }
+
     public AuthenticationProvider<?> findAuthenticationProviderByName(String authenticationProviderName)
     {
         if (isManagementMode())

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Aug  6 11:05:05 2014
@@ -111,4 +111,8 @@ public interface AMQQueue<X extends AMQQ
     void completeRecovery();
 
     void recover(ServerMessage<?> message);
+
+    void setTargetSize(long targetSize);
+
+    long getPotentialMemoryFootprint();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Aug  6 11:05:05 2014
@@ -79,6 +79,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -113,6 +114,8 @@ public abstract class AbstractQueue<X ex
         }
     };
 
+    private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l;
+
     private final VirtualHostImpl _virtualHost;
     private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
 
@@ -130,6 +133,8 @@ public abstract class AbstractQueue<X ex
 
     private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
 
+    private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
+
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
     private final AtomicLong _totalMessagesReceived = new AtomicLong();
@@ -924,6 +929,11 @@ public abstract class AbstractQueue<X ex
         incrementQueueCount();
         incrementQueueSize(message);
 
+        if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory())
+        {
+            message.getStoredMessage().flowToDisk();
+        }
+
         _totalMessagesReceived.incrementAndGet();
 
         if(_recovering.get())
@@ -1206,6 +1216,12 @@ public abstract class AbstractQueue<X ex
         }
     }
 
+    @Override
+    public void setTargetSize(final long targetSize)
+    {
+        _targetQueueSize.set(targetSize);
+    }
+
     public long getTotalDequeuedMessages()
     {
         return _dequeueCount.get();
@@ -2188,6 +2204,9 @@ public abstract class AbstractQueue<X ex
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
+        long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages();
+        long targetSize = _targetQueueSize.get();
+
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
@@ -2210,8 +2229,15 @@ public abstract class AbstractQueue<X ex
                     // the time the check actually occurs. So verify we
                     // can actually get the message to perform the check.
                     ServerMessage msg = node.getMessage();
+
                     if (msg != null)
                     {
+                        totalSize += msg.getSize();
+                        StoredMessage storedMessage = msg.getStoredMessage();
+                        if(totalSize > targetSize && storedMessage.isInMemory())
+                        {
+                            storedMessage.flowToDisk();
+                        }
                         checkForNotification(msg);
                     }
                 }
@@ -2220,6 +2246,13 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    @Override
+    public long getPotentialMemoryFootprint()
+    {
+        return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT),
+                        getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages());
+    }
+
     public long getAlertRepeatGap()
     {
         return _alertRepeatGap;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Wed Aug  6 11:05:05 2014
@@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessag
             storedSizeChange(-delta);
         }
 
+        @Override
+        public boolean isInMemory()
+        {
+            return _messageDataRef.isHardRef();
+        }
+
+        @Override
+        public boolean flowToDisk()
+        {
+            try(Connection conn = newConnection())
+            {
+                store(conn);
+                conn.commit();
+            }
+            catch (SQLException e)
+            {
+                throw new StoreException(e);
+            }
+            finally
+            {
+
+            }
+            return true;
+        }
+
         private synchronized Runnable store(final Connection conn) throws SQLException
         {
             if (!stored())

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed Aug  6 11:05:05 2014
@@ -130,4 +130,16 @@ public class StoredMemoryMessage<T exten
     public void remove()
     {
     }
+
+    @Override
+    public boolean isInMemory()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean flowToDisk()
+    {
+        return false;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed Aug  6 11:05:05 2014
@@ -35,4 +35,8 @@ public interface StoredMessage<M extends
     ByteBuffer getContent(int offsetInMessage, int size);
 
     void remove();
+
+    boolean isInMemory();
+
+    boolean flowToDisk();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Aug  6 11:05:05 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
@@ -130,6 +131,8 @@ public abstract class AbstractVirtualHos
     private final AtomicBoolean _deleted = new AtomicBoolean();
     private final VirtualHostNode<?> _virtualHostNode;
 
+    private final AtomicLong _targetSize = new AtomicLong(1024*1024);
+
     private MessageStoreLogSubject _messageStoreLogSubject;
 
     @ManagedAttributeField
@@ -847,6 +850,10 @@ public abstract class AbstractVirtualHos
 
         public void execute()
         {
+            VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
+            Broker<?> broker = virtualHostNode.getParent(Broker.class);
+            broker.assignTargetSizes();
+
             for (AMQQueue<?> q : getQueues())
             {
                 if (q.getState() == State.ACTIVE)
@@ -1311,6 +1318,46 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public void setTargetSize(final long targetSize)
+    {
+        _targetSize.set(targetSize);
+        allocateTargetSizeToQueues();
+    }
+
+    private void allocateTargetSizeToQueues()
+    {
+        long targetSize = _targetSize.get();
+        Collection<AMQQueue<?>> queues = getQueues();
+        long totalSize = calculateTotalEnqueuedSize(queues);
+        if(targetSize > 0l)
+        {
+            for (AMQQueue<?> q : queues)
+            {
+                long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
+                                             * (double) targetSize);
+
+                q.setTargetSize(size);
+            }
+        }
+    }
+
+    @Override
+    public long getTotalQueueDepthBytes()
+    {
+        return calculateTotalEnqueuedSize(getQueues());
+    }
+
+    private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues)
+    {
+        long total = 0;
+        for(AMQQueue<?> queue : queues)
+        {
+            total += queue.getPotentialMemoryFootprint();
+        }
+        return total;
+    }
+
+    @Override
     protected void onCreate()
     {
         super.onCreate();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Wed Aug  6 11:05:05 2014
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRec
                 entry.getValue().release();
                 entry.setValue(null); // free up any memory associated with the reference object
             }
+            final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
             getStore().visitMessages(new MessageHandler()
             {
                 @Override
@@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRec
                     long messageNumber = storedMessage.getMessageNumber();
                     if(!_recoveredMessages.containsKey(messageNumber))
                     {
-                        _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
-                        storedMessage.remove();
+                        messagesToDelete.add(storedMessage);
                     }
                     return messageNumber <_maxMessageId-1;
                 }
             });
+            for(StoredMessage<?> storedMessage : messagesToDelete)
+            {
+
+                _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
+                storedMessage.remove();
+            }
+
+            messagesToDelete.clear();
             _recoveredMessages.clear();
         }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Wed Aug  6 11:05:05 2014
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -106,7 +105,19 @@ public class MessageConverter_Internal_t
                     {
                         throw new UnsupportedOperationException();
                     }
-                };
+
+                    @Override
+                    public boolean isInMemory()
+                    {
+                        return true;
+                    }
+
+                    @Override
+                    public boolean flowToDisk()
+                    {
+                        return false;
+                    }
+        };
     }
 
     private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Wed Aug  6 11:05:05 2014
@@ -31,7 +31,6 @@ import java.util.Map;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -106,7 +105,19 @@ public class MessageConverter_v0_10 impl
                     {
                         throw new UnsupportedOperationException();
                     }
-                };
+
+                    @Override
+                    public boolean isInMemory()
+                    {
+                        return true;
+                    }
+
+                    @Override
+                    public boolean flowToDisk()
+                    {
+                        return false;
+                    }
+        };
     }
 
     private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Wed Aug  6 11:05:05 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -118,6 +117,18 @@ public class MessageConverter_Internal_t
             {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public boolean isInMemory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean flowToDisk()
+            {
+                return false;
+            }
         };
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Wed Aug  6 11:05:05 2014
@@ -20,15 +20,14 @@
 */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 
-import java.nio.ByteBuffer;
-
 public class MockStoredMessage implements StoredMessage<MessageMetaData>
 {
     private long _messageId;
@@ -107,4 +106,16 @@ public class MockStoredMessage implement
     public void remove()
     {
     }
+
+    @Override
+    public boolean isInMemory()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean flowToDisk()
+    {
+        return false;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Aug  6 11:05:05 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -265,7 +264,19 @@ public abstract class MessageConverter_t
                         {
                             throw new UnsupportedOperationException();
                         }
-                    };
+
+                        @Override
+                        public boolean isInMemory()
+                        {
+                            return true;
+                        }
+
+                        @Override
+                        public boolean flowToDisk()
+                        {
+                            return false;
+                        }
+        };
     }
 
     protected Section getBodySection(final M serverMessage, final String mimeType)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Wed Aug  6 11:05:05 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
 import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -115,6 +114,18 @@ public class MessageConverter_1_0_to_v0_
             {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public boolean isInMemory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean flowToDisk()
+            {
+                return false;
+            }
         };
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Aug  6 11:05:05 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.qpid.AMQPInvalidClassException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -36,7 +37,6 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -214,6 +214,18 @@ public class MessageConverter_0_10_to_0_
             {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public boolean isInMemory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean flowToDisk()
+            {
+                return false;
+            }
         };
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Wed Aug  6 11:05:05 2014
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
+
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
@@ -32,7 +33,6 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
 import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.DeliveryProperties;
@@ -103,6 +103,18 @@ public class MessageConverter_0_8_to_0_1
             {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public boolean isInMemory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean flowToDisk()
+            {
+                return false;
+            }
         };
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1616155&r1=1616154&r2=1616155&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Wed Aug  6 11:05:05 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
 import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
@@ -119,6 +118,18 @@ public class MessageConverter_1_0_to_v0_
             {
                 throw new UnsupportedOperationException();
             }
+
+            @Override
+            public boolean isInMemory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean flowToDisk()
+            {
+                return false;
+            }
         };
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org