You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/23 10:09:17 UTC

[1/6] qpid-broker-j git commit: QPID-7794: [Java Broker] Periodically report the number of bytes evacuated from memory by flow to disk

Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.0.x 516854eae -> b77dcd8a3


QPID-7794: [Java Broker] Periodically report the number of bytes evacuated from memory by flow to disk

These are the Store parts required.

(Cherry-picked from 6.1.x: 5920ed7 and 7cf5eb3)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8cf6fdf1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8cf6fdf1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8cf6fdf1

Branch: refs/heads/6.0.x
Commit: 8cf6fdf1c4d79e1f67a74286e2f15f73fb3230b9
Parents: 516854e
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 14:10:18 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:14:51 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 25 +++++++++++++++++---
 .../store/berkeleydb/BDBConfigurationStore.java |  1 +
 .../store/berkeleydb/BDBMessageStore.java       |  1 +
 .../berkeleydb/BDBHAReplicaVirtualHostImpl.java |  6 +++++
 .../message/AbstractServerMessageImpl.java      |  9 +++++--
 .../qpid/server/message/MessageReference.java   |  7 +++---
 .../apache/qpid/server/model/VirtualHost.java   |  3 +++
 .../apache/qpid/server/queue/AbstractQueue.java |  2 +-
 .../server/store/AbstractJDBCMessageStore.java  | 18 +++++++++++---
 .../qpid/server/store/MemoryMessageStore.java   |  8 +++++--
 .../apache/qpid/server/store/MessageStore.java  |  2 ++
 .../qpid/server/store/NullMessageStore.java     |  6 +++++
 .../server/virtualhost/AbstractVirtualHost.java |  6 +++++
 .../RedirectingVirtualHostImpl.java             |  6 +++++
 .../server/store/TestMessageMetaDataType.java   |  6 +++++
 15 files changed, 92 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index bd4f2e2..b52e25c 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.sleepycat.bind.tuple.LongBinding;
@@ -100,6 +101,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private boolean _limitBusted;
     private long _totalStoreSize;
     private final Random _lockConflictRandom = new Random();
+    private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -147,6 +149,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _bytesEvacuatedFromMemory.get();
+    }
+
+    @Override
     public boolean isPersistent()
     {
         return true;
@@ -167,6 +175,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     }
 
     @Override
+    public void closeMessageStore()
+    {
+        _bytesEvacuatedFromMemory.set(0);
+    }
+
+    @Override
     public MessageStoreReader newMessageStoreReader()
     {
         return new BDBMessageStoreReader();
@@ -968,10 +982,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             _data = data;
         }
 
-        public void clear()
+        public long clear()
         {
+            long bytesCleared = 0;
             if(_metaData != null)
             {
+                bytesCleared += _metaData.getStorableSize();
                 _metaData.clearEncodedForm();
                 _metaData = null;
             }
@@ -979,10 +995,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 for(QpidByteBuffer buf : _data)
                 {
+                    bytesCleared += buf.remaining();
                     buf.dispose();
                 }
+                _data = null;
             }
-            _data = null;
+            return bytesCleared;
         }
 
         @Override
@@ -1208,7 +1226,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                ((MessageDataSoftRef)_messageDataRef).clear();
+                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
             return true;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 389b538..5be0e45 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -541,6 +541,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
         @Override
         public void closeMessageStore()
         {
+            super.closeMessageStore();
             _messageStoreOpen.set(false);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 0411327..7bb23a2 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -84,6 +84,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
     @Override
     public void closeMessageStore()
     {
+        super.closeMessageStore();
         if (_messageStoreOpen.compareAndSet(true, false))
         {
             if (_environmentFacade != null)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
index a9d8f53..6c1bdd0 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
@@ -288,6 +288,12 @@ public class BDBHAReplicaVirtualHostImpl extends AbstractConfiguredObject<BDBHAR
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0;
+    }
+
+    @Override
     public Collection<VirtualHostAlias> getAliases()
     {
         return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 03b3198..7d28eed 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -223,11 +223,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
         private final UUID _resourceId;
         private volatile int _released;
 
-        private Reference(final AbstractServerMessageImpl<X, T> message)
+        private Reference(final AbstractServerMessageImpl<X, T> message) throws MessageDeletedException
         {
             this(message, null);
         }
-        private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+        private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) throws MessageDeletedException
         {
             _message = message;
             if(resource != null)
@@ -309,6 +309,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
             }
         }
 
+        @Override
+        public void close()
+        {
+            release();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index eda8550..dfe5d64 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.server.message;
 
-public interface MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage> extends AutoCloseable
 {
-    public M getMessage();
-    public void release();
+    M getMessage();
+    void release();
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 2d6bdc3..9b2f18c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -170,6 +170,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Outbound")
     long getMessagesOut();
 
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk")
+    long getBytesEvacuatedFromMemory();
+
     Broker<?> getBroker();
 
     //children

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index e4aefe5..169faa9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -3720,7 +3720,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
         void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
         {
-            final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+            final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
             if (estimatedQueueSize > targetQueueSize
                 || allocatedDirectMemorySize > _flowToDiskThreshold)
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 5d2aa98..a2ad036 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -118,6 +118,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
+    private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -277,6 +278,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
         {
             _executor.shutdown();
@@ -1101,6 +1103,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         return true;
     }
 
+    @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _bytesEvacuatedFromMemory.get();
+    }
 
     protected class JDBCTransaction implements Transaction
     {
@@ -1370,10 +1377,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _data = data;
         }
 
-        public void clear()
+        public long clear()
         {
+            long bytesCleared = 0;
             if(_metaData != null)
             {
+                bytesCleared += _metaData.getStorableSize();
                 _metaData.clearEncodedForm();
                 _metaData = null;
             }
@@ -1381,10 +1390,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 for(QpidByteBuffer buf : _data)
                 {
+                    bytesCleared += buf.remaining();
                     buf.dispose();
                 }
+                _data = null;
             }
-            _data = null;
+            return bytesCleared;
         }
 
         @Override
@@ -1622,7 +1633,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                ((MessageDataSoftRef)_messageDataRef).clear();
+                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
             return true;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 114d0d1..baedc02 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -286,14 +286,12 @@ public class MemoryMessageStore implements MessageStore
 
         StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
         {
-
             @Override
             public void remove()
             {
                 _messages.remove(getMessageNumber());
                 super.remove();
             }
-
         };
 
         return storedMemoryMessage;
@@ -313,6 +311,12 @@ public class MemoryMessageStore implements MessageStore
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0L;
+    }
+
+    @Override
     public Transaction newTransaction()
     {
         return new MemoryMessageStoreTransaction();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index ab5764e..60b89e7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -61,6 +61,8 @@ public interface MessageStore
 
     <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
 
+    long getBytesEvacuatedFromMemory();
+
     /**
      * Is this store capable of persisting the data
      *

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index b8ca8b0..d5d4463 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -101,6 +101,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0L;
+    }
+
+    @Override
     public Transaction newTransaction()
     {
         return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 56e6fb8..b9d3cbf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -847,6 +847,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
+    }
+
+    @Override
     public ExchangeImpl getAttainedExchange(String name)
     {
         Exchange child = awaitChildClassToAttainState(Exchange.class, name);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
index 574e601..727901c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
@@ -161,6 +161,12 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
+    public long getBytesEvacuatedFromMemory()
+    {
+        return 0L;
+    }
+
+    @Override
     public ExchangeImpl<?> getAttainedExchange(final String name)
     {
         return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index fb57d3a..a7b1267 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -78,6 +78,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
         {
 
             @Override
+            public void close()
+            {
+                release();
+            }
+
+            @Override
             public ServerMessage getMessage()
             {
                 return TestServerMessage.this;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/6] qpid-broker-j git commit: QPID-7795: [Java Broker] Ensure that a newly enqueued message that is flowed to disk does not immediately have meta-data reloaded (6.1/6.0)

Posted by lq...@apache.org.
QPID-7795: [Java Broker] Ensure that a newly enqueued message that is flowed to disk does not immediately have meta-data reloaded (6.1/6.0)

Cherry picked from QPID-7775 commit 8ae1d142b33edc91d4988c9f4b775026bb03acc4


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8f3a80bc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8f3a80bc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8f3a80bc

Branch: refs/heads/6.0.x
Commit: 8f3a80bcb4f0091367bf26c48cc4a2334f7767ae
Parents: 37999be
Author: Keith Wall <ke...@gmail.com>
Authored: Thu May 18 18:17:38 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:40:37 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/protocol/v0_10/ServerSession.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3a80bc/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 34d6c72..16b36ef 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -280,11 +280,13 @@ public class ServerSession extends Session
             _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
             invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
         }
+        // locally cache arrival time to ensure that we don't reload metadata
+        final long arrivalTime = message.getArrivalTime();
         int enqueues = exchange.send(message,
                                      message.getInitialRoutingAddress(),
                                      instanceProperties, _transaction, _checkCapacityAction
                                     );
-        getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+        getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
         incrementOutstandingTxnsIfNecessary();
         incrementUncommittedMessageSize(message.getStoredMessage());
         return enqueues;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/6] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost

Posted by lq...@apache.org.
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost

Cherry-picked from b63815c


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a1d66f1d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a1d66f1d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a1d66f1d

Branch: refs/heads/6.0.x
Commit: a1d66f1d344087ad088b2f55894be957abac5ca3
Parents: df071cb
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 15:21:57 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:19:59 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 55 ++++++++++++++++++--
 .../server/store/AbstractJDBCMessageStore.java  | 54 +++++++++++++++++--
 .../qpid/server/store/MemoryMessageStore.java   |  4 ++
 .../qpid/server/store/StoredMemoryMessage.java  |  4 ++
 4 files changed, 110 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index b52e25c..573d287 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -102,6 +104,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private long _totalStoreSize;
     private final Random _lockConflictRandom = new Random();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<StoredBDBMessage<?>, Boolean>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -125,7 +128,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
         long newMessageId = getNextMessageId();
 
-        return new StoredBDBMessage<T>(newMessageId, metaData);
+        return createStoredBDBMessage(newMessageId, metaData, false);
+    }
+
+    public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final long newMessageId,
+                                                                                          final T metaData,
+                                                                                          final boolean recovered)
+    {
+        final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId, metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     public long getNextMessageId()
@@ -177,6 +189,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredBDBMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
     }
 
@@ -436,7 +453,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 if (!handler.handle(message))
                 {
                     break;
@@ -482,7 +499,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
             {
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 return message;
             }
             else
@@ -915,6 +932,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -950,6 +968,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             return true;
         }
+
+        @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
     }
 
     private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -982,6 +1022,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1191,6 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 removeMessage(_messageId, false);
@@ -1238,6 +1280,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             return this.getClass() + "[messageId=" + _messageId + "]";
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index a2ad036..e7e1b0f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -119,6 +120,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<StoredJDBCMessage<?>, Boolean>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -278,6 +280,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredJDBCMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
         {
@@ -443,8 +450,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+        return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+    }
 
+    public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final long newMessageId,
+                                                                                          final T metaData,
+                                                                                          final boolean recovered)
+    {
+        final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId, metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     @Override
@@ -1310,6 +1325,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         Collection<QpidByteBuffer> getData();
         void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
+        long clear();
     }
 
     private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -1341,6 +1357,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         @Override
+        public long clear()
+        {
+            long bytesCleared = 0;
+            if(_metaData != null)
+            {
+                bytesCleared += _metaData.getStorableSize();
+                _metaData.clearEncodedForm();
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    bytesCleared += buf.remaining();
+                    buf.dispose();
+                }
+                _data = null;
+            }
+            return bytesCleared;
+        }
+
+        @Override
         public boolean isHardRef()
         {
             return true;
@@ -1377,6 +1414,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _data = data;
         }
 
+        @Override
         public long clear()
         {
             long bytesCleared = 0;
@@ -1546,7 +1584,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
@@ -1598,6 +1635,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
             final T metaData = getMetaData();
             int delta = metaData.getContentSize();
+            _messages.remove(this);
             if(stored())
             {
                 AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1639,6 +1677,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             return true;
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
+
         @Override
         public String toString()
         {
@@ -1685,7 +1731,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            message = new StoredJDBCMessage(messageId, metaData, true);
+                            message = createStoredJDBCMessage(messageId, metaData, true);
 
                         }
                         else
@@ -1738,7 +1784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0]) &0xff);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+                            StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
                             if (!handler.handle(message))
                             {
                                 break;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index baedc02..8983619 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -325,6 +325,10 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+        {
+            storedMemoryMessage.clear();
+        }
         _messages.clear();
         synchronized (_transactionLock)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 80e317f..d5dc0c6 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -141,4 +141,8 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return false;
     }
 
+    public void clear()
+    {
+        remove();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/6] qpid-broker-j git commit: QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.

Posted by lq...@apache.org.
QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.

Cherry picked from d9af2660089139e2f4fdad8c0aa0e0c8e6529ff5 and f9262e9


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37999be9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37999be9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37999be9

Branch: refs/heads/6.0.x
Commit: 37999be9e23445201e1407d79e2cb632624ffddb
Parents: a1d66f1
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 15:29:19 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:37:45 2017 +0100

----------------------------------------------------------------------
 .../transport/NetworkConnectionScheduler.java   | 35 +++++++++++++++++++-
 .../server/virtualhost/AbstractVirtualHost.java | 25 ++++++++++++++
 .../apache/qpid/bytebuffer/QpidByteBuffer.java  |  9 +++++
 3 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index 22e70bb..2745809 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.transport;
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -32,8 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.TransportException;
 
+
 public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -99,7 +103,36 @@ public class NetworkConnectionScheduler
             _selectorThread = new SelectorThread(this, _numberOfSelectors);
             _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
                                                _threadKeepAliveTimeout, TimeUnit.MINUTES,
-                                               new LinkedBlockingQueue<Runnable>(), _factory);
+                                               new LinkedBlockingQueue<Runnable>(), _factory)
+            {
+                private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+                @Override
+                protected void afterExecute(final Runnable r, final Throwable t)
+                {
+                    super.afterExecute(r, t);
+                    final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+                    if (cachedThreadLocalBuffer != null)
+                    {
+                        _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+                    }
+                    else
+                    {
+                        _cachedBufferMap.remove(Thread.currentThread());
+                    }
+                }
+
+                @Override
+                protected void terminated()
+                {
+                    super.terminated();
+                    for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+                    {
+                        qpidByteBuffer.dispose();
+                    }
+                    _cachedBufferMap.clear();
+                }
+            };
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
             for(int i = 0 ; i < _poolSize; i++)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index a23feab..41bf0b0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -56,6 +56,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
 import org.apache.qpid.server.configuration.updater.Task;
@@ -1936,10 +1937,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                 new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-pool",
                                                                           SecurityManager.getSystemTaskSubject("Housekeeping", getPrincipal()));
         _houseKeepingTaskExecutor = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), housekeepingThreadFactory){
+            private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
             @Override
             protected void afterExecute(Runnable r, Throwable t)
             {
                 super.afterExecute(r, t);
+
+                final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+                if (cachedThreadLocalBuffer != null)
+                {
+                    _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+                }
+                else
+                {
+                    _cachedBufferMap.remove(Thread.currentThread());
+                }
+
                 if (t == null && r instanceof Future<?>)
                 {
                     Future future = (Future<?>) r;
@@ -1983,6 +1997,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                     }
                 }
             }
+
+            @Override
+            protected void terminated()
+            {
+                super.terminated();
+                for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+                {
+                    qpidByteBuffer.dispose();
+                }
+                _cachedBufferMap.clear();
+            }
         };
 
         long threadPoolKeepAliveTimeout = getContextValue(Long.class, CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 7f8c740..85c4dae 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -706,6 +706,15 @@ public final class QpidByteBuffer
         _isPoolInitialized = true;
     }
 
+    /**
+     * Not for general use!
+     * Used to clear threadlocal buffer when shutting down thread pools.
+     */
+    public static QpidByteBuffer getCachedThreadLocalBuffer()
+    {
+        return _cachedBuffer.get();
+    }
+
     public static int getPooledBufferSize()
     {
         return _pooledBufferSize;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/6] qpid-broker-j git commit: QPID-7796: [Java Broker] Guard against NPE in 0-10 when storing messages without header

Posted by lq...@apache.org.
QPID-7796: [Java Broker] Guard against NPE in 0-10 when storing messages without header


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b77dcd8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b77dcd8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b77dcd8a

Branch: refs/heads/6.0.x
Commit: b77dcd8a3d66e7c183444adad90fdb105b3815a2
Parents: 8f3a80b
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 23 09:52:24 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:42:04 2017 +0100

----------------------------------------------------------------------
 .../protocol/v0_10/MessageMetaData_0_10.java    | 49 +++++++++++---------
 1 file changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b77dcd8a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 831ceb7..9dcdc4b 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -104,37 +104,42 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
         encoder.writeInt64(_arrivalTime);
         encoder.writeInt32(_bodySize);
         int headersLength = 0;
-        if(_header.getDeliveryProperties() != null)
+        if (_header != null)
         {
-            headersLength++;
-        }
-        if(_header.getMessageProperties() != null)
-        {
-            headersLength++;
-        }
-        if(_header.getNonStandardProperties() != null)
-        {
-            headersLength += _header.getNonStandardProperties().size();
+            if (_header.getDeliveryProperties() != null)
+            {
+                headersLength++;
+            }
+            if (_header.getMessageProperties() != null)
+            {
+                headersLength++;
+            }
+            if (_header.getNonStandardProperties() != null)
+            {
+                headersLength += _header.getNonStandardProperties().size();
+            }
         }
 
         encoder.writeInt32(headersLength);
 
-        if(_header.getDeliveryProperties() != null)
+        if (_header != null)
         {
-            encoder.writeStruct32(_header.getDeliveryProperties());
-        }
-        if(_header.getMessageProperties() != null)
-        {
-            encoder.writeStruct32(_header.getMessageProperties());
-        }
-        if(_header.getNonStandardProperties() != null)
-        {
-
-            for(Struct header : _header.getNonStandardProperties())
+            if (_header.getDeliveryProperties() != null)
+            {
+                encoder.writeStruct32(_header.getDeliveryProperties());
+            }
+            if (_header.getMessageProperties() != null)
             {
-                encoder.writeStruct32(header);
+                encoder.writeStruct32(_header.getMessageProperties());
             }
+            if (_header.getNonStandardProperties() != null)
+            {
 
+                for (Struct header : _header.getNonStandardProperties())
+                {
+                    encoder.writeStruct32(header);
+                }
+            }
         }
         QpidByteBuffer buf = encoder.getBuffer();
         encoder.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/6] qpid-broker-j git commit: QPID-7794: [Java Broker] periodically log flow to disk statistics on VirtualHost

Posted by lq...@apache.org.
QPID-7794: [Java Broker] periodically log flow to disk statistics on VirtualHost

(Cherry-picked from 6.1.x: 11a7522)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/df071cb3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/df071cb3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/df071cb3

Branch: refs/heads/6.0.x
Commit: df071cb30740c6a9ddb33451fa293d69271244e5
Parents: 8cf6fdf
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 14:18:28 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:18:27 2017 +0100

----------------------------------------------------------------------
 .../server/logging/messages/BrokerMessages.java | 120 -------------------
 .../messages/Broker_logmessages.properties      |   7 +-
 .../server/logging/messages/QueueMessages.java  | 120 -------------------
 .../messages/Queue_logmessages.properties       |   6 +-
 .../logging/messages/VirtualHostMessages.java   | 120 ++++++++++++++-----
 .../messages/VirtualHost_logmessages.properties |   2 +
 .../server/model/adapter/BrokerAdapter.java     |  16 ---
 .../apache/qpid/server/queue/AbstractQueue.java |  56 +--------
 .../server/virtualhost/AbstractVirtualHost.java |  10 ++
 9 files changed, 109 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
index 3f1d858..c66a674 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
@@ -47,11 +47,9 @@ public class BrokerMessages
     public static final String BROKER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker";
     public static final String READY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.ready";
     public static final String FAILED_CHILDREN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.failed_children";
-    public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active";
     public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening";
     public static final String STARTUP_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.startup";
     public static final String MANAGEMENT_MODE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.management_mode";
-    public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive";
     public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs";
     public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform";
     public static final String CONFIG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.config";
@@ -68,11 +66,9 @@ public class BrokerMessages
         LoggerFactory.getLogger(BROKER_LOG_HIERARCHY);
         LoggerFactory.getLogger(READY_LOG_HIERARCHY);
         LoggerFactory.getLogger(FAILED_CHILDREN_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
         LoggerFactory.getLogger(LISTENING_LOG_HIERARCHY);
         LoggerFactory.getLogger(STARTUP_LOG_HIERARCHY);
         LoggerFactory.getLogger(MANAGEMENT_MODE_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
         LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
         LoggerFactory.getLogger(PLATFORM_LOG_HIERARCHY);
         LoggerFactory.getLogger(CONFIG_LOG_HIERARCHY);
@@ -200,64 +196,6 @@ public class BrokerMessages
 
     /**
      * Log a Broker message of the Format:
-     * <pre>BRK-1014 : Message flow to disk active :  Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
-        final Object[] messageArguments = {param1, param2};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-    /**
-     * Log a Broker message of the Format:
      * <pre>BRK-1002 : Starting : Listening on {0} port {1,number,#}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
@@ -432,64 +370,6 @@ public class BrokerMessages
 
     /**
      * Log a Broker message of the Format:
-     * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
-        final Object[] messageArguments = {param1, param2};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-    /**
-     * Log a Broker message of the Format:
      * <pre>BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
index b74ad36..0883aef 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
@@ -49,10 +49,9 @@ MAX_MEMORY = BRK-1011 : Maximum Memory : Heap : {0,number} bytes Direct : {1,num
 
 MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1}
 
-# 0 - Total message size
-# 1 - Target memory size
-FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active :  Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB
-FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active :  Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB
+#FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB
 
 FATAL_ERROR = BRK-1016 : Fatal error : {0} : See log file for more information
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 0e4883d..6206283 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -49,8 +49,6 @@ public class QueueMessages
     public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
     public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
     public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
-    public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
-    public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
 
     static
     {
@@ -59,8 +57,6 @@ public class QueueMessages
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
-        LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
     }
@@ -360,122 +356,6 @@ public class QueueMessages
         };
     }
 
-    /**
-     * Log a Queue message of the Format:
-     * <pre>QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2, Number param3, Number param4)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
-        final Object[] messageArguments = {param1, param2, param3, param4};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
-    /**
-     * Log a Queue message of the Format:
-     * <pre>QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
-     * Optional values are contained in [square brackets] and are numbered
-     * sequentially in the method call.
-     *
-     */
-    public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2, Number param3, Number param4)
-    {
-        String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
-        final Object[] messageArguments = {param1, param2, param3, param4};
-        // Create a new MessageFormat to ensure thread safety.
-        // Sharing a MessageFormat and using applyPattern is not thread safe
-        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
-        final String message = formatter.format(messageArguments);
-
-        return new LogMessage()
-        {
-            public String toString()
-            {
-                return message;
-            }
-
-            public String getLogHierarchy()
-            {
-                return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
-            }
-
-            @Override
-            public boolean equals(final Object o)
-            {
-                if (this == o)
-                {
-                    return true;
-                }
-                if (o == null || getClass() != o.getClass())
-                {
-                    return false;
-                }
-
-                final LogMessage that = (LogMessage) o;
-
-                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
-            }
-
-            @Override
-            public int hashCode()
-            {
-                int result = toString().hashCode();
-                result = 31 * result + getLogHierarchy().hashCode();
-                return result;
-            }
-        };
-    }
-
 
     private QueueMessages()
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index 3ef8370..c5c7e84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,6 @@ DELETED = QUE-1002 : Deleted : ID: {0}
 OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
 
-# use similar number to the broker for similar topic
-FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
-FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+#FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
index c00fb99..5741dd8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
@@ -45,38 +45,40 @@ public class VirtualHostMessages
     private static Locale _currentLocale = BrokerProperties.getLocale();
 
     public static final String VIRTUALHOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost";
-    public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed";
+    public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created";
     public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_data";
-    public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs";
+    public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored";
+    public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed";
     public static final String FILESYSTEM_FULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_full";
+    public static final String FLOW_TO_DISK_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.flow_to_disk";
     public static final String FILESYSTEM_NOTFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_notfull";
-    public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created";
-    public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored";
+    public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs";
 
     static
     {
         LoggerFactory.getLogger(VIRTUALHOST_LOG_HIERARCHY);
-        LoggerFactory.getLogger(CLOSED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(STATS_DATA_LOG_HIERARCHY);
-        LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
+        LoggerFactory.getLogger(ERRORED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(CLOSED_LOG_HIERARCHY);
         LoggerFactory.getLogger(FILESYSTEM_FULL_LOG_HIERARCHY);
+        LoggerFactory.getLogger(FLOW_TO_DISK_LOG_HIERARCHY);
         LoggerFactory.getLogger(FILESYSTEM_NOTFULL_LOG_HIERARCHY);
-        LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
-        LoggerFactory.getLogger(ERRORED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
 
         _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.VirtualHost_logmessages", _currentLocale);
     }
 
     /**
      * Log a VirtualHost message of the Format:
-     * <pre>VHT-1002 : Closed : {0}</pre>
+     * <pre>VHT-1001 : Created : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage CLOSED(String param1)
+    public static LogMessage CREATED(String param1)
     {
-        String rawMessage = _messages.getString("CLOSED");
+        String rawMessage = _messages.getString("CREATED");
 
         final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
@@ -94,7 +96,7 @@ public class VirtualHostMessages
 
             public String getLogHierarchy()
             {
-                return CLOSED_LOG_HIERARCHY;
+                return CREATED_LOG_HIERARCHY;
             }
 
             @Override
@@ -185,16 +187,16 @@ public class VirtualHostMessages
 
     /**
      * Log a VirtualHost message of the Format:
-     * <pre>VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total</pre>
+     * <pre>VHT-1005 : {0} Unexpected fatal error</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage STATS_MSGS(String param1, Number param2, Number param3, Number param4)
+    public static LogMessage ERRORED(String param1)
     {
-        String rawMessage = _messages.getString("STATS_MSGS");
+        String rawMessage = _messages.getString("ERRORED");
 
-        final Object[] messageArguments = {param1, param2, param3, param4};
+        final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
         // Sharing a MessageFormat and using applyPattern is not thread safe
         MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -210,7 +212,65 @@ public class VirtualHostMessages
 
             public String getLogHierarchy()
             {
-                return STATS_MSGS_LOG_HIERARCHY;
+                return ERRORED_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a VirtualHost message of the Format:
+     * <pre>VHT-1002 : Closed : {0}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage CLOSED(String param1)
+    {
+        String rawMessage = _messages.getString("CLOSED");
+
+        final Object[] messageArguments = {param1};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return CLOSED_LOG_HIERARCHY;
             }
 
             @Override
@@ -301,14 +361,14 @@ public class VirtualHostMessages
 
     /**
      * Log a VirtualHost message of the Format:
-     * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre>
+     * <pre>VHT-1008 : Total number of bytes evacuated from memory due to flow to disk : {0,number} bytes</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage FILESYSTEM_NOTFULL(Number param1)
+    public static LogMessage FLOW_TO_DISK(Number param1)
     {
-        String rawMessage = _messages.getString("FILESYSTEM_NOTFULL");
+        String rawMessage = _messages.getString("FLOW_TO_DISK");
 
         final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
@@ -326,7 +386,7 @@ public class VirtualHostMessages
 
             public String getLogHierarchy()
             {
-                return FILESYSTEM_NOTFULL_LOG_HIERARCHY;
+                return FLOW_TO_DISK_LOG_HIERARCHY;
             }
 
             @Override
@@ -359,14 +419,14 @@ public class VirtualHostMessages
 
     /**
      * Log a VirtualHost message of the Format:
-     * <pre>VHT-1001 : Created : {0}</pre>
+     * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage CREATED(String param1)
+    public static LogMessage FILESYSTEM_NOTFULL(Number param1)
     {
-        String rawMessage = _messages.getString("CREATED");
+        String rawMessage = _messages.getString("FILESYSTEM_NOTFULL");
 
         final Object[] messageArguments = {param1};
         // Create a new MessageFormat to ensure thread safety.
@@ -384,7 +444,7 @@ public class VirtualHostMessages
 
             public String getLogHierarchy()
             {
-                return CREATED_LOG_HIERARCHY;
+                return FILESYSTEM_NOTFULL_LOG_HIERARCHY;
             }
 
             @Override
@@ -417,16 +477,16 @@ public class VirtualHostMessages
 
     /**
      * Log a VirtualHost message of the Format:
-     * <pre>VHT-1005 : {0} Unexpected fatal error</pre>
+     * <pre>VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage ERRORED(String param1)
+    public static LogMessage STATS_MSGS(String param1, Number param2, Number param3, Number param4)
     {
-        String rawMessage = _messages.getString("ERRORED");
+        String rawMessage = _messages.getString("STATS_MSGS");
 
-        final Object[] messageArguments = {param1};
+        final Object[] messageArguments = {param1, param2, param3, param4};
         // Create a new MessageFormat to ensure thread safety.
         // Sharing a MessageFormat and using applyPattern is not thread safe
         MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -442,7 +502,7 @@ public class VirtualHostMessages
 
             public String getLogHierarchy()
             {
-                return ERRORED_LOG_HIERARCHY;
+                return STATS_MSGS_LOG_HIERARCHY;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
index 6bab8ec..1ea2fc2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
@@ -29,3 +29,5 @@ ERRORED = VHT-1005 : {0} Unexpected fatal error
 
 FILESYSTEM_FULL = VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control.
 FILESYSTEM_NOTFULL = VHT-1007 : Filesystem is no longer over {0,number} per cent full.
+
+FLOW_TO_DISK = VHT-1008 : Total number of bytes evacuated from memory due to flow to disk : {0,number} bytes

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 17defb8..317d517 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -104,9 +104,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
     private Timer _reportingTimer;
     private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
-    /** Flags used to control the reporting of flow to disk. Protected by this */
-    private boolean _totalMessageSizeExceedThresholdReported = false,  _totalMessageSizeWithinThresholdReported = true;
-
     @ManagedAttributeField
     private int _connection_sessionCountLimit;
     @ManagedAttributeField
@@ -523,19 +520,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
             }
         }
 
-        if (totalSize > _flowToDiskThreshold && !_totalMessageSizeExceedThresholdReported)
-        {
-            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
-            _totalMessageSizeExceedThresholdReported = true;
-            _totalMessageSizeWithinThresholdReported = false;
-        }
-        else if (totalSize <= _flowToDiskThreshold && !_totalMessageSizeWithinThresholdReported)
-        {
-            _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
-            _totalMessageSizeWithinThresholdReported = true;
-            _totalMessageSizeExceedThresholdReported = false;
-        }
-
         final long proportionalShare = (long) ((double) _flowToDiskThreshold / (double) vhs.size());
         for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet())
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 169faa9..7118060 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1144,8 +1144,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
-        _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
-                                                          _targetQueueSize.get());
+        _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(), estimatedQueueSize, _targetQueueSize.get());
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -2511,7 +2510,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
         final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
-        _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
 
         final long currentTime = System.currentTimeMillis();
 
@@ -3698,8 +3696,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     private class FlowToDiskChecker
     {
-        final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
         void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
         {
             if ((estimatedQueueSize > targetQueueSize
@@ -3709,55 +3705,5 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                 storedMessage.flowToDisk();
             }
         }
-
-        void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
-                                            final long estimatedQueueSize,
-                                            final long targetQueueSize)
-        {
-            flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
-            reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
-        }
-
-        void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
-        {
-            final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
-            if (estimatedQueueSize > targetQueueSize
-                || allocatedDirectMemorySize > _flowToDiskThreshold)
-            {
-                reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-            else
-            {
-                reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
-            }
-        }
-
-        private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
-                                                       long targetQueueSize,
-                                                       long allocatedDirectMemorySize,
-                                                       long flowToDiskThreshold)
-        {
-            if (!_lastReportedFlowToDiskStatus.getAndSet(true))
-            {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
-                                                                                        targetQueueSize / 1024.0,
-                                                                                        allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                        flowToDiskThreshold / 1024.0 / 1024.0));
-            }
-        }
-
-        private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
-                                                         long targetQueueSize,
-                                                         long allocatedDirectMemorySize,
-                                                         long flowToDiskThreshold)
-        {
-            if (_lastReportedFlowToDiskStatus.getAndSet(false))
-            {
-                getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
-                                                                                          targetQueueSize / 1024.0,
-                                                                                          allocatedDirectMemorySize / 1024.0 / 1024.0,
-                                                                                          flowToDiskThreshold / 1024.0 / 1024.0));
-            }
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index b9d3cbf..a23feab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1272,6 +1272,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
 
     private class VirtualHostHouseKeepingTask extends HouseKeepingTask
     {
+
+        private long _lastReportedBytesEvacuatedFromMemory = 0L;
+
         public VirtualHostHouseKeepingTask()
         {
             super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
@@ -1303,6 +1306,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                                                    getStoreTransactionIdleTimeoutClose());
                 }
             }
+
+            final long currentBytesEvacuatedFromMemory = getBytesEvacuatedFromMemory();
+            if (currentBytesEvacuatedFromMemory != _lastReportedBytesEvacuatedFromMemory)
+            {
+                getEventLogger().message(VirtualHostMessages.FLOW_TO_DISK(currentBytesEvacuatedFromMemory));
+                _lastReportedBytesEvacuatedFromMemory = currentBytesEvacuatedFromMemory;
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org