You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/23 10:09:17 UTC
[1/6] qpid-broker-j git commit: QPID-7794: [Java Broker] Periodically
report the number of bytes evacuated from memory by flow to disk
Repository: qpid-broker-j
Updated Branches:
refs/heads/6.0.x 516854eae -> b77dcd8a3
QPID-7794: [Java Broker] Periodically report the number of bytes evacuated from memory by flow to disk
These are the Store parts required.
(Cherry-picked from 6.1.x: 5920ed7 and 7cf5eb3)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8cf6fdf1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8cf6fdf1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8cf6fdf1
Branch: refs/heads/6.0.x
Commit: 8cf6fdf1c4d79e1f67a74286e2f15f73fb3230b9
Parents: 516854e
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 14:10:18 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:14:51 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 25 +++++++++++++++++---
.../store/berkeleydb/BDBConfigurationStore.java | 1 +
.../store/berkeleydb/BDBMessageStore.java | 1 +
.../berkeleydb/BDBHAReplicaVirtualHostImpl.java | 6 +++++
.../message/AbstractServerMessageImpl.java | 9 +++++--
.../qpid/server/message/MessageReference.java | 7 +++---
.../apache/qpid/server/model/VirtualHost.java | 3 +++
.../apache/qpid/server/queue/AbstractQueue.java | 2 +-
.../server/store/AbstractJDBCMessageStore.java | 18 +++++++++++---
.../qpid/server/store/MemoryMessageStore.java | 8 +++++--
.../apache/qpid/server/store/MessageStore.java | 2 ++
.../qpid/server/store/NullMessageStore.java | 6 +++++
.../server/virtualhost/AbstractVirtualHost.java | 6 +++++
.../RedirectingVirtualHostImpl.java | 6 +++++
.../server/store/TestMessageMetaDataType.java | 6 +++++
15 files changed, 92 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index bd4f2e2..b52e25c 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.bind.tuple.LongBinding;
@@ -100,6 +101,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private boolean _limitBusted;
private long _totalStoreSize;
private final Random _lockConflictRandom = new Random();
+ private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
@Override
public void upgradeStoreStructure() throws StoreException
@@ -147,6 +149,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _bytesEvacuatedFromMemory.get();
+ }
+
+ @Override
public boolean isPersistent()
{
return true;
@@ -167,6 +175,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public void closeMessageStore()
+ {
+ _bytesEvacuatedFromMemory.set(0);
+ }
+
+ @Override
public MessageStoreReader newMessageStoreReader()
{
return new BDBMessageStoreReader();
@@ -968,10 +982,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_data = data;
}
- public void clear()
+ public long clear()
{
+ long bytesCleared = 0;
if(_metaData != null)
{
+ bytesCleared += _metaData.getStorableSize();
_metaData.clearEncodedForm();
_metaData = null;
}
@@ -979,10 +995,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
for(QpidByteBuffer buf : _data)
{
+ bytesCleared += buf.remaining();
buf.dispose();
}
+ _data = null;
}
- _data = null;
+ return bytesCleared;
}
@Override
@@ -1208,7 +1226,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- ((MessageDataSoftRef)_messageDataRef).clear();
+ final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
index 389b538..5be0e45 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
@@ -541,6 +541,7 @@ public class BDBConfigurationStore implements MessageStoreProvider, DurableConfi
@Override
public void closeMessageStore()
{
+ super.closeMessageStore();
_messageStoreOpen.set(false);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 0411327..7bb23a2 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -84,6 +84,7 @@ public class BDBMessageStore extends AbstractBDBMessageStore
@Override
public void closeMessageStore()
{
+ super.closeMessageStore();
if (_messageStoreOpen.compareAndSet(true, false))
{
if (_environmentFacade != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
index a9d8f53..6c1bdd0 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
@@ -288,6 +288,12 @@ public class BDBHAReplicaVirtualHostImpl extends AbstractConfiguredObject<BDBHAR
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0;
+ }
+
+ @Override
public Collection<VirtualHostAlias> getAliases()
{
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 03b3198..7d28eed 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -223,11 +223,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private final UUID _resourceId;
private volatile int _released;
- private Reference(final AbstractServerMessageImpl<X, T> message)
+ private Reference(final AbstractServerMessageImpl<X, T> message) throws MessageDeletedException
{
this(message, null);
}
- private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) throws MessageDeletedException
{
_message = message;
if(resource != null)
@@ -309,6 +309,11 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
}
}
+ @Override
+ public void close()
+ {
+ release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index eda8550..dfe5d64 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.server.message;
-public interface MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage> extends AutoCloseable
{
- public M getMessage();
- public void release();
+ M getMessage();
+ void release();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 2d6bdc3..9b2f18c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -170,6 +170,9 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Outbound")
long getMessagesOut();
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk")
+ long getBytesEvacuatedFromMemory();
+
Broker<?> getBroker();
//children
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index e4aefe5..169faa9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -3720,7 +3720,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
{
- final int allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
+ final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
if (estimatedQueueSize > targetQueueSize
|| allocatedDirectMemorySize > _flowToDiskThreshold)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 5d2aa98..a2ad036 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -118,6 +118,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
protected final EventManager _eventManager = new EventManager();
private ConfiguredObject<?> _parent;
+ private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
protected abstract boolean isMessageStoreOpen();
@@ -277,6 +278,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ _bytesEvacuatedFromMemory.set(0);
if(_executor != null)
{
_executor.shutdown();
@@ -1101,6 +1103,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return true;
}
+ @Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _bytesEvacuatedFromMemory.get();
+ }
protected class JDBCTransaction implements Transaction
{
@@ -1370,10 +1377,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
_data = data;
}
- public void clear()
+ public long clear()
{
+ long bytesCleared = 0;
if(_metaData != null)
{
+ bytesCleared += _metaData.getStorableSize();
_metaData.clearEncodedForm();
_metaData = null;
}
@@ -1381,10 +1390,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
for(QpidByteBuffer buf : _data)
{
+ bytesCleared += buf.remaining();
buf.dispose();
}
+ _data = null;
}
- _data = null;
+ return bytesCleared;
}
@Override
@@ -1622,7 +1633,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- ((MessageDataSoftRef)_messageDataRef).clear();
+ final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 114d0d1..baedc02 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -286,14 +286,12 @@ public class MemoryMessageStore implements MessageStore
StoredMemoryMessage<T> storedMemoryMessage = new StoredMemoryMessage<T>(id, metaData)
{
-
@Override
public void remove()
{
_messages.remove(getMessageNumber());
super.remove();
}
-
};
return storedMemoryMessage;
@@ -313,6 +311,12 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0L;
+ }
+
+ @Override
public Transaction newTransaction()
{
return new MemoryMessageStoreTransaction();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index ab5764e..60b89e7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -61,6 +61,8 @@ public interface MessageStore
<T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
+ long getBytesEvacuatedFromMemory();
+
/**
* Is this store capable of persisting the data
*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index b8ca8b0..d5d4463 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -101,6 +101,12 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0L;
+ }
+
+ @Override
public Transaction newTransaction()
{
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 56e6fb8..b9d3cbf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -847,6 +847,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return _messageStore == null ? -1 : _messageStore.getBytesEvacuatedFromMemory();
+ }
+
+ @Override
public ExchangeImpl getAttainedExchange(String name)
{
Exchange child = awaitChildClassToAttainState(Exchange.class, name);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
index 574e601..727901c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
@@ -161,6 +161,12 @@ class RedirectingVirtualHostImpl
}
@Override
+ public long getBytesEvacuatedFromMemory()
+ {
+ return 0L;
+ }
+
+ @Override
public ExchangeImpl<?> getAttainedExchange(final String name)
{
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8cf6fdf1/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index fb57d3a..a7b1267 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -78,6 +78,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
{
@Override
+ public void close()
+ {
+ release();
+ }
+
+ @Override
public ServerMessage getMessage()
{
return TestServerMessage.this;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/6] qpid-broker-j git commit: QPID-7795: [Java Broker] Ensure that
a newly enqueued message that is flowed to disk does not immediately have
meta-data reloaded (6.1/6.0)
Posted by lq...@apache.org.
QPID-7795: [Java Broker] Ensure that a newly enqueued message that is flowed to disk does not immediately have meta-data reloaded (6.1/6.0)
Cherry picked from QPID-7775 commit 8ae1d142b33edc91d4988c9f4b775026bb03acc4
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8f3a80bc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8f3a80bc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8f3a80bc
Branch: refs/heads/6.0.x
Commit: 8f3a80bcb4f0091367bf26c48cc4a2334f7767ae
Parents: 37999be
Author: Keith Wall <ke...@gmail.com>
Authored: Thu May 18 18:17:38 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:40:37 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/protocol/v0_10/ServerSession.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8f3a80bc/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 34d6c72..16b36ef 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -280,11 +280,13 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ // locally cache arrival time to ensure that we don't reload metadata
+ final long arrivalTime = message.getArrivalTime();
int enqueues = exchange.send(message,
message.getInitialRoutingAddress(),
instanceProperties, _transaction, _checkCapacityAction
);
- getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
incrementOutstandingTxnsIfNecessary();
incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/6] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of
QpidByteBuffers associated with message content/headers when stopping/closing
a VirtualHost
Posted by lq...@apache.org.
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost
Cherry-picked from b63815c
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a1d66f1d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a1d66f1d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a1d66f1d
Branch: refs/heads/6.0.x
Commit: a1d66f1d344087ad088b2f55894be957abac5ca3
Parents: df071cb
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 15:21:57 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:19:59 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 55 ++++++++++++++++++--
.../server/store/AbstractJDBCMessageStore.java | 54 +++++++++++++++++--
.../qpid/server/store/MemoryMessageStore.java | 4 ++
.../qpid/server/store/StoredMemoryMessage.java | 4 ++
4 files changed, 110 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index b52e25c..573d287 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ListenableFuture;
@@ -102,6 +104,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private long _totalStoreSize;
private final Random _lockConflictRandom = new Random();
private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+ private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<StoredBDBMessage<?>, Boolean>());
@Override
public void upgradeStoreStructure() throws StoreException
@@ -125,7 +128,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- return new StoredBDBMessage<T>(newMessageId, metaData);
+ return createStoredBDBMessage(newMessageId, metaData, false);
+ }
+
+ public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final long newMessageId,
+ final T metaData,
+ final boolean recovered)
+ {
+ final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId, metaData, recovered);
+ _messages.add(message);
+ return message;
}
public long getNextMessageId()
@@ -177,6 +189,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredBDBMessage<?> message : _messages)
+ {
+ message.clear();
+ }
+ _messages.clear();
_bytesEvacuatedFromMemory.set(0);
}
@@ -436,7 +453,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
@@ -482,7 +499,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
{
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
return message;
}
else
@@ -915,6 +932,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
Collection<QpidByteBuffer> getData();
void setData(Collection<QpidByteBuffer> data);
boolean isHardRef();
+ long clear();
}
private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -950,6 +968,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
return true;
}
+
+ @Override
+ public long clear()
+ {
+ long bytesCleared = 0;
+ if(_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ _metaData.clearEncodedForm();
+ }
+ if(_data != null)
+ {
+ for(QpidByteBuffer buf : _data)
+ {
+ bytesCleared += buf.remaining();
+ buf.dispose();
+ }
+ _data = null;
+ }
+ return bytesCleared;
+ }
+
}
private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -982,6 +1022,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_data = data;
}
+ @Override
public long clear()
{
long bytesCleared = 0;
@@ -1191,6 +1232,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
final T metaData = getMetaData();
int delta = metaData.getContentSize();
+ _messages.remove(this);
if(stored())
{
removeMessage(_messageId, false);
@@ -1238,6 +1280,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return this.getClass() + "[messageId=" + _messageId + "]";
}
+ public synchronized void clear()
+ {
+ if (_messageDataRef != null)
+ {
+ _messageDataRef.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index a2ad036..e7e1b0f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -119,6 +120,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
protected final EventManager _eventManager = new EventManager();
private ConfiguredObject<?> _parent;
private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+ private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<StoredJDBCMessage<?>, Boolean>());
protected abstract boolean isMessageStoreOpen();
@@ -278,6 +280,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredJDBCMessage<?> message : _messages)
+ {
+ message.clear();
+ }
+ _messages.clear();
_bytesEvacuatedFromMemory.set(0);
if(_executor != null)
{
@@ -443,8 +450,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
- return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+ return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+ }
+ public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final long newMessageId,
+ final T metaData,
+ final boolean recovered)
+ {
+ final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId, metaData, recovered);
+ _messages.add(message);
+ return message;
}
@Override
@@ -1310,6 +1325,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
Collection<QpidByteBuffer> getData();
void setData(Collection<QpidByteBuffer> data);
boolean isHardRef();
+ long clear();
}
private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -1341,6 +1357,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
+ public long clear()
+ {
+ long bytesCleared = 0;
+ if(_metaData != null)
+ {
+ bytesCleared += _metaData.getStorableSize();
+ _metaData.clearEncodedForm();
+ }
+ if(_data != null)
+ {
+ for(QpidByteBuffer buf : _data)
+ {
+ bytesCleared += buf.remaining();
+ buf.dispose();
+ }
+ _data = null;
+ }
+ return bytesCleared;
+ }
+
+ @Override
public boolean isHardRef()
{
return true;
@@ -1377,6 +1414,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
_data = data;
}
+ @Override
public long clear()
{
long bytesCleared = 0;
@@ -1546,7 +1584,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (!stored())
{
-
AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
@@ -1598,6 +1635,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final T metaData = getMetaData();
int delta = metaData.getContentSize();
+ _messages.remove(this);
if(stored())
{
AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1639,6 +1677,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return true;
}
+ public synchronized void clear()
+ {
+ if (_messageDataRef != null)
+ {
+ _messageDataRef.clear();
+ }
+ }
+
@Override
public String toString()
{
@@ -1685,7 +1731,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
StorableMessageMetaData metaData = type.createMetaData(buf);
buf.dispose();
- message = new StoredJDBCMessage(messageId, metaData, true);
+ message = createStoredJDBCMessage(messageId, metaData, true);
}
else
@@ -1738,7 +1784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0]) &0xff);
StorableMessageMetaData metaData = type.createMetaData(buf);
buf.dispose();
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index baedc02..8983619 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -325,6 +325,10 @@ public class MemoryMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+ {
+ storedMemoryMessage.clear();
+ }
_messages.clear();
synchronized (_transactionLock)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a1d66f1d/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 80e317f..d5dc0c6 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -141,4 +141,8 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return false;
}
+ public void clear()
+ {
+ remove();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/6] qpid-broker-j git commit: QPID-7784: [Java Broker] Dispose
QpidByteBuffers associated with pooled threads when shutting down executors.
Posted by lq...@apache.org.
QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.
Cherry picked from d9af2660089139e2f4fdad8c0aa0e0c8e6529ff5 and f9262e9
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37999be9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37999be9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37999be9
Branch: refs/heads/6.0.x
Commit: 37999be9e23445201e1407d79e2cb632624ffddb
Parents: a1d66f1
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 15:29:19 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:37:45 2017 +0100
----------------------------------------------------------------------
.../transport/NetworkConnectionScheduler.java | 35 +++++++++++++++++++-
.../server/virtualhost/AbstractVirtualHost.java | 25 ++++++++++++++
.../apache/qpid/bytebuffer/QpidByteBuffer.java | 9 +++++
3 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index 22e70bb..2745809 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.transport;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -32,8 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.TransportException;
+
public class NetworkConnectionScheduler
{
private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -99,7 +103,36 @@ public class NetworkConnectionScheduler
_selectorThread = new SelectorThread(this, _numberOfSelectors);
_executor = new ThreadPoolExecutor(_poolSize, _poolSize,
_threadKeepAliveTimeout, TimeUnit.MINUTES,
- new LinkedBlockingQueue<Runnable>(), _factory);
+ new LinkedBlockingQueue<Runnable>(), _factory)
+ {
+ private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+ @Override
+ protected void afterExecute(final Runnable r, final Throwable t)
+ {
+ super.afterExecute(r, t);
+ final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+ if (cachedThreadLocalBuffer != null)
+ {
+ _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+ }
+ else
+ {
+ _cachedBufferMap.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ protected void terminated()
+ {
+ super.terminated();
+ for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+ {
+ qpidByteBuffer.dispose();
+ }
+ _cachedBufferMap.clear();
+ }
+ };
_executor.prestartAllCoreThreads();
_executor.allowCoreThreadTimeOut(true);
for(int i = 0 ; i < _poolSize; i++)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index a23feab..41bf0b0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -56,6 +56,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.configuration.updater.Task;
@@ -1936,10 +1937,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
new SuppressingInheritedAccessControlContextThreadFactory("virtualhost-" + getName() + "-pool",
SecurityManager.getSystemTaskSubject("Housekeeping", getPrincipal()));
_houseKeepingTaskExecutor = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), housekeepingThreadFactory){
+ private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
+
+ final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+ if (cachedThreadLocalBuffer != null)
+ {
+ _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+ }
+ else
+ {
+ _cachedBufferMap.remove(Thread.currentThread());
+ }
+
if (t == null && r instanceof Future<?>)
{
Future future = (Future<?>) r;
@@ -1983,6 +1997,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
}
+
+ @Override
+ protected void terminated()
+ {
+ super.terminated();
+ for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+ {
+ qpidByteBuffer.dispose();
+ }
+ _cachedBufferMap.clear();
+ }
};
long threadPoolKeepAliveTimeout = getContextValue(Long.class, CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37999be9/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 7f8c740..85c4dae 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -706,6 +706,15 @@ public final class QpidByteBuffer
_isPoolInitialized = true;
}
+ /**
+ * Not for general use!
+ * Used to clear threadlocal buffer when shutting down thread pools.
+ */
+ public static QpidByteBuffer getCachedThreadLocalBuffer()
+ {
+ return _cachedBuffer.get();
+ }
+
public static int getPooledBufferSize()
{
return _pooledBufferSize;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/6] qpid-broker-j git commit: QPID-7796: [Java Broker] Guard
against NPE in 0-10 when storing messages without header
Posted by lq...@apache.org.
QPID-7796: [Java Broker] Guard against NPE in 0-10 when storing messages without header
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b77dcd8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b77dcd8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b77dcd8a
Branch: refs/heads/6.0.x
Commit: b77dcd8a3d66e7c183444adad90fdb105b3815a2
Parents: 8f3a80b
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 23 09:52:24 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:42:04 2017 +0100
----------------------------------------------------------------------
.../protocol/v0_10/MessageMetaData_0_10.java | 49 +++++++++++---------
1 file changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b77dcd8a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 831ceb7..9dcdc4b 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -104,37 +104,42 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
encoder.writeInt64(_arrivalTime);
encoder.writeInt32(_bodySize);
int headersLength = 0;
- if(_header.getDeliveryProperties() != null)
+ if (_header != null)
{
- headersLength++;
- }
- if(_header.getMessageProperties() != null)
- {
- headersLength++;
- }
- if(_header.getNonStandardProperties() != null)
- {
- headersLength += _header.getNonStandardProperties().size();
+ if (_header.getDeliveryProperties() != null)
+ {
+ headersLength++;
+ }
+ if (_header.getMessageProperties() != null)
+ {
+ headersLength++;
+ }
+ if (_header.getNonStandardProperties() != null)
+ {
+ headersLength += _header.getNonStandardProperties().size();
+ }
}
encoder.writeInt32(headersLength);
- if(_header.getDeliveryProperties() != null)
+ if (_header != null)
{
- encoder.writeStruct32(_header.getDeliveryProperties());
- }
- if(_header.getMessageProperties() != null)
- {
- encoder.writeStruct32(_header.getMessageProperties());
- }
- if(_header.getNonStandardProperties() != null)
- {
-
- for(Struct header : _header.getNonStandardProperties())
+ if (_header.getDeliveryProperties() != null)
+ {
+ encoder.writeStruct32(_header.getDeliveryProperties());
+ }
+ if (_header.getMessageProperties() != null)
{
- encoder.writeStruct32(header);
+ encoder.writeStruct32(_header.getMessageProperties());
}
+ if (_header.getNonStandardProperties() != null)
+ {
+ for (Struct header : _header.getNonStandardProperties())
+ {
+ encoder.writeStruct32(header);
+ }
+ }
}
QpidByteBuffer buf = encoder.getBuffer();
encoder.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/6] qpid-broker-j git commit: QPID-7794: [Java Broker] periodically
log flow to disk statistics on VirtualHost
Posted by lq...@apache.org.
QPID-7794: [Java Broker] periodically log flow to disk statistics on VirtualHost
(Cherry-picked from 6.1.x: 11a7522)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/df071cb3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/df071cb3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/df071cb3
Branch: refs/heads/6.0.x
Commit: df071cb30740c6a9ddb33451fa293d69271244e5
Parents: 8cf6fdf
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 14:18:28 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Tue May 23 10:18:27 2017 +0100
----------------------------------------------------------------------
.../server/logging/messages/BrokerMessages.java | 120 -------------------
.../messages/Broker_logmessages.properties | 7 +-
.../server/logging/messages/QueueMessages.java | 120 -------------------
.../messages/Queue_logmessages.properties | 6 +-
.../logging/messages/VirtualHostMessages.java | 120 ++++++++++++++-----
.../messages/VirtualHost_logmessages.properties | 2 +
.../server/model/adapter/BrokerAdapter.java | 16 ---
.../apache/qpid/server/queue/AbstractQueue.java | 56 +--------
.../server/virtualhost/AbstractVirtualHost.java | 10 ++
9 files changed, 109 insertions(+), 348 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
index 3f1d858..c66a674 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
@@ -47,11 +47,9 @@ public class BrokerMessages
public static final String BROKER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker";
public static final String READY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.ready";
public static final String FAILED_CHILDREN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.failed_children";
- public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active";
public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening";
public static final String STARTUP_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.startup";
public static final String MANAGEMENT_MODE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.management_mode";
- public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive";
public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs";
public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform";
public static final String CONFIG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.config";
@@ -68,11 +66,9 @@ public class BrokerMessages
LoggerFactory.getLogger(BROKER_LOG_HIERARCHY);
LoggerFactory.getLogger(READY_LOG_HIERARCHY);
LoggerFactory.getLogger(FAILED_CHILDREN_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
LoggerFactory.getLogger(LISTENING_LOG_HIERARCHY);
LoggerFactory.getLogger(STARTUP_LOG_HIERARCHY);
LoggerFactory.getLogger(MANAGEMENT_MODE_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
LoggerFactory.getLogger(PLATFORM_LOG_HIERARCHY);
LoggerFactory.getLogger(CONFIG_LOG_HIERARCHY);
@@ -200,64 +196,6 @@ public class BrokerMessages
/**
* Log a Broker message of the Format:
- * <pre>BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
* <pre>BRK-1002 : Starting : Listening on {0} port {1,number,#}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
@@ -432,64 +370,6 @@ public class BrokerMessages
/**
* Log a Broker message of the Format:
- * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
* <pre>BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
index b74ad36..0883aef 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
@@ -49,10 +49,9 @@ MAX_MEMORY = BRK-1011 : Maximum Memory : Heap : {0,number} bytes Direct : {1,num
MANAGEMENT_MODE = BRK-1012 : Management Mode : User Details : {0} / {1}
-# 0 - Total message size
-# 1 - Target memory size
-FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB
-FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = BRK-1014 : Message flow to disk active : Message memory use {0,number,#}KB exceeds threshold {1,number,#.##}KB
+#FLOW_TO_DISK_INACTIVE = BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB
FATAL_ERROR = BRK-1016 : Fatal error : {0} : See log file for more information
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 0e4883d..6206283 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -49,8 +49,6 @@ public class QueueMessages
public static final String UNDERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
- public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
- public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_inactive";
static
{
@@ -59,8 +57,6 @@ public class QueueMessages
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Queue_logmessages", _currentLocale);
}
@@ -360,122 +356,6 @@ public class QueueMessages
};
}
- /**
- * Log a Queue message of the Format:
- * <pre>QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2, Number param3, Number param4)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
- final Object[] messageArguments = {param1, param2, param3, param4};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Queue message of the Format:
- * <pre>QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_ACTIVE(Number param1, Number param2, Number param3, Number param4)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_ACTIVE");
-
- final Object[] messageArguments = {param1, param2, param3, param4};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
private QueueMessages()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index 3ef8370..c5c7e84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,6 @@ DELETED = QUE-1002 : Deleted : ID: {0}
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
-# use similar number to the broker for similar topic
-FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
-FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+# These are no longer in use
+#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
+#FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#.##} MB, threshold {3,number,#.##} MB
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
index c00fb99..5741dd8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHostMessages.java
@@ -45,38 +45,40 @@ public class VirtualHostMessages
private static Locale _currentLocale = BrokerProperties.getLocale();
public static final String VIRTUALHOST_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost";
- public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed";
+ public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created";
public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_data";
- public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs";
+ public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored";
+ public static final String CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.closed";
public static final String FILESYSTEM_FULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_full";
+ public static final String FLOW_TO_DISK_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.flow_to_disk";
public static final String FILESYSTEM_NOTFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.filesystem_notfull";
- public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.created";
- public static final String ERRORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.errored";
+ public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "virtualhost.stats_msgs";
static
{
LoggerFactory.getLogger(VIRTUALHOST_LOG_HIERARCHY);
- LoggerFactory.getLogger(CLOSED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(STATS_DATA_LOG_HIERARCHY);
- LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
+ LoggerFactory.getLogger(ERRORED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CLOSED_LOG_HIERARCHY);
LoggerFactory.getLogger(FILESYSTEM_FULL_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FLOW_TO_DISK_LOG_HIERARCHY);
LoggerFactory.getLogger(FILESYSTEM_NOTFULL_LOG_HIERARCHY);
- LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
- LoggerFactory.getLogger(ERRORED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.VirtualHost_logmessages", _currentLocale);
}
/**
* Log a VirtualHost message of the Format:
- * <pre>VHT-1002 : Closed : {0}</pre>
+ * <pre>VHT-1001 : Created : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage CLOSED(String param1)
+ public static LogMessage CREATED(String param1)
{
- String rawMessage = _messages.getString("CLOSED");
+ String rawMessage = _messages.getString("CREATED");
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -94,7 +96,7 @@ public class VirtualHostMessages
public String getLogHierarchy()
{
- return CLOSED_LOG_HIERARCHY;
+ return CREATED_LOG_HIERARCHY;
}
@Override
@@ -185,16 +187,16 @@ public class VirtualHostMessages
/**
* Log a VirtualHost message of the Format:
- * <pre>VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total</pre>
+ * <pre>VHT-1005 : {0} Unexpected fatal error</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage STATS_MSGS(String param1, Number param2, Number param3, Number param4)
+ public static LogMessage ERRORED(String param1)
{
- String rawMessage = _messages.getString("STATS_MSGS");
+ String rawMessage = _messages.getString("ERRORED");
- final Object[] messageArguments = {param1, param2, param3, param4};
+ final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -210,7 +212,65 @@ public class VirtualHostMessages
public String getLogHierarchy()
{
- return STATS_MSGS_LOG_HIERARCHY;
+ return ERRORED_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a VirtualHost message of the Format:
+ * <pre>VHT-1002 : Closed : {0}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CLOSED(String param1)
+ {
+ String rawMessage = _messages.getString("CLOSED");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CLOSED_LOG_HIERARCHY;
}
@Override
@@ -301,14 +361,14 @@ public class VirtualHostMessages
/**
* Log a VirtualHost message of the Format:
- * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre>
+ * <pre>VHT-1008 : Total number of bytes evacuated from memory due to flow to disk : {0,number} bytes</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage FILESYSTEM_NOTFULL(Number param1)
+ public static LogMessage FLOW_TO_DISK(Number param1)
{
- String rawMessage = _messages.getString("FILESYSTEM_NOTFULL");
+ String rawMessage = _messages.getString("FLOW_TO_DISK");
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -326,7 +386,7 @@ public class VirtualHostMessages
public String getLogHierarchy()
{
- return FILESYSTEM_NOTFULL_LOG_HIERARCHY;
+ return FLOW_TO_DISK_LOG_HIERARCHY;
}
@Override
@@ -359,14 +419,14 @@ public class VirtualHostMessages
/**
* Log a VirtualHost message of the Format:
- * <pre>VHT-1001 : Created : {0}</pre>
+ * <pre>VHT-1007 : Filesystem is no longer over {0,number} per cent full.</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage CREATED(String param1)
+ public static LogMessage FILESYSTEM_NOTFULL(Number param1)
{
- String rawMessage = _messages.getString("CREATED");
+ String rawMessage = _messages.getString("FILESYSTEM_NOTFULL");
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -384,7 +444,7 @@ public class VirtualHostMessages
public String getLogHierarchy()
{
- return CREATED_LOG_HIERARCHY;
+ return FILESYSTEM_NOTFULL_LOG_HIERARCHY;
}
@Override
@@ -417,16 +477,16 @@ public class VirtualHostMessages
/**
* Log a VirtualHost message of the Format:
- * <pre>VHT-1005 : {0} Unexpected fatal error</pre>
+ * <pre>VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage ERRORED(String param1)
+ public static LogMessage STATS_MSGS(String param1, Number param2, Number param3, Number param4)
{
- String rawMessage = _messages.getString("ERRORED");
+ String rawMessage = _messages.getString("STATS_MSGS");
- final Object[] messageArguments = {param1};
+ final Object[] messageArguments = {param1, param2, param3, param4};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -442,7 +502,7 @@ public class VirtualHostMessages
public String getLogHierarchy()
{
- return ERRORED_LOG_HIERARCHY;
+ return STATS_MSGS_LOG_HIERARCHY;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
index 6bab8ec..1ea2fc2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
@@ -29,3 +29,5 @@ ERRORED = VHT-1005 : {0} Unexpected fatal error
FILESYSTEM_FULL = VHT-1006 : Filesystem is over {0,number} per cent full, enforcing flow control.
FILESYSTEM_NOTFULL = VHT-1007 : Filesystem is no longer over {0,number} per cent full.
+
+FLOW_TO_DISK = VHT-1008 : Total number of bytes evacuated from memory due to flow to disk : {0,number} bytes
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 17defb8..317d517 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -104,9 +104,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private Timer _reportingTimer;
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- /** Flags used to control the reporting of flow to disk. Protected by this */
- private boolean _totalMessageSizeExceedThresholdReported = false, _totalMessageSizeWithinThresholdReported = true;
-
@ManagedAttributeField
private int _connection_sessionCountLimit;
@ManagedAttributeField
@@ -523,19 +520,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
}
- if (totalSize > _flowToDiskThreshold && !_totalMessageSizeExceedThresholdReported)
- {
- _eventLogger.message(BrokerMessages.FLOW_TO_DISK_ACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
- _totalMessageSizeExceedThresholdReported = true;
- _totalMessageSizeWithinThresholdReported = false;
- }
- else if (totalSize <= _flowToDiskThreshold && !_totalMessageSizeWithinThresholdReported)
- {
- _eventLogger.message(BrokerMessages.FLOW_TO_DISK_INACTIVE(totalSize / 1024, _flowToDiskThreshold / 1024));
- _totalMessageSizeWithinThresholdReported = true;
- _totalMessageSizeExceedThresholdReported = false;
- }
-
final long proportionalShare = (long) ((double) _flowToDiskThreshold / (double) vhs.size());
for (Map.Entry<VirtualHost<?, ?, ?>, Long> entry : vhs.entrySet())
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 169faa9..7118060 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1144,8 +1144,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
- _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
- _targetQueueSize.get());
+ _flowToDiskChecker.flowToDiskIfNecessary(message.getStoredMessage(), estimatedQueueSize, _targetQueueSize.get());
}
public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
@@ -2511,7 +2510,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueEntryIterator queueListIterator = getEntries().iterator();
final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
- _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
final long currentTime = System.currentTimeMillis();
@@ -3698,8 +3696,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private class FlowToDiskChecker
{
- final AtomicBoolean _lastReportedFlowToDiskStatus = new AtomicBoolean(false);
-
void flowToDiskIfNecessary(StoredMessage<?> storedMessage, long estimatedQueueSize, final long targetQueueSize)
{
if ((estimatedQueueSize > targetQueueSize
@@ -3709,55 +3705,5 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
storedMessage.flowToDisk();
}
}
-
- void flowToDiskAndReportIfNecessary(StoredMessage<?> storedMessage,
- final long estimatedQueueSize,
- final long targetQueueSize)
- {
- flowToDiskIfNecessary(storedMessage, estimatedQueueSize, targetQueueSize);
- reportFlowToDiskStatusIfNecessary(estimatedQueueSize, targetQueueSize);
- }
-
- void reportFlowToDiskStatusIfNecessary(final long estimatedQueueSize, final long targetQueueSize)
- {
- final long allocatedDirectMemorySize = QpidByteBuffer.getAllocatedDirectMemorySize();
- if (estimatedQueueSize > targetQueueSize
- || allocatedDirectMemorySize > _flowToDiskThreshold)
- {
- reportFlowToDiskActiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- else
- {
- reportFlowToDiskInactiveIfNecessary(estimatedQueueSize, targetQueueSize, allocatedDirectMemorySize, _flowToDiskThreshold);
- }
- }
-
- private void reportFlowToDiskActiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
- {
- if (!_lastReportedFlowToDiskStatus.getAndSet(true))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
- }
- }
-
- private void reportFlowToDiskInactiveIfNecessary(long estimatedQueueSize,
- long targetQueueSize,
- long allocatedDirectMemorySize,
- long flowToDiskThreshold)
- {
- if (_lastReportedFlowToDiskStatus.getAndSet(false))
- {
- getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024.0,
- targetQueueSize / 1024.0,
- allocatedDirectMemorySize / 1024.0 / 1024.0,
- flowToDiskThreshold / 1024.0 / 1024.0));
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/df071cb3/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index b9d3cbf..a23feab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1272,6 +1272,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
+
+ private long _lastReportedBytesEvacuatedFromMemory = 0L;
+
public VirtualHostHouseKeepingTask()
{
super("Housekeeping["+AbstractVirtualHost.this.getName()+"]",AbstractVirtualHost.this,_housekeepingJobContext);
@@ -1303,6 +1306,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getStoreTransactionIdleTimeoutClose());
}
}
+
+ final long currentBytesEvacuatedFromMemory = getBytesEvacuatedFromMemory();
+ if (currentBytesEvacuatedFromMemory != _lastReportedBytesEvacuatedFromMemory)
+ {
+ getEventLogger().message(VirtualHostMessages.FLOW_TO_DISK(currentBytesEvacuatedFromMemory));
+ _lastReportedBytesEvacuatedFromMemory = currentBytesEvacuatedFromMemory;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org