You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/06 15:14:46 UTC
[3/4] qpid-broker-j git commit: QPID-8204: [Broker-J] Add statistics
to report the maximum size of incoming messages
QPID-8204: [Broker-J] Add statistics to report the maximum size of incoming messages
(cherry picked from commit fb9ef05587cd0076ffd0947930ae642721eddaba)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bd09c16a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bd09c16a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bd09c16a
Branch: refs/heads/7.0.x
Commit: bd09c16a36c4f6316972b7174fa9a20cc08b6f90
Parents: 881323e
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jun 6 13:27:43 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 6 14:28:02 2018 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Broker.java | 15 ++++++++++++
.../apache/qpid/server/model/BrokerImpl.java | 19 +++++++++++++++
.../org/apache/qpid/server/util/StatsUtils.java | 25 ++++++++++++++++++++
.../server/virtualhost/AbstractVirtualHost.java | 17 +++++++++++++
.../virtualhost/QueueManagingVirtualHost.java | 9 +++++++
5 files changed, 85 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 7978b1b..770709a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -126,6 +126,12 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String PROPERTY_DISABLED_FEATURES = "qpid.broker_disabled_features";
+ String MAX_MESSAGE_SIZE__STATISTICS_ENABLED = "qpid.broker.maxMessageSizeStatisticsEnabled";
+ @ManagedContextDefault(name = MAX_MESSAGE_SIZE__STATISTICS_ENABLED,
+ description = "Flag to enable collection of maximum message size statistics.")
+ @SuppressWarnings("unused")
+ boolean DEFAULT_MAX_MESSAGE_SIZE_STATISTICS_ENABLED = false;
+
@DerivedAttribute
String getBuildVersion();
@@ -294,6 +300,15 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
description = "Number of unused direct memory buffers currently in the pool.")
long getNumberOfBuffersInPool();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+ units = StatisticUnit.BYTES,
+ label = "Maximum inbound message size",
+ description = "Maximum size of messages published into the Broker since start-up."
+ + " The statistics is only evaluated when context variable"
+ + " 'qpid.broker.maxMessageSizeStatisticsEnabled' is set to 'true'.")
+ long getMaximumMessageSize();
+
@ManagedOperation(nonModifying = true,
description = "Restart the broker within the same JVM",
changesConfiguredObjectState = false,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 9eac92f..84554bf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -121,6 +121,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private final AtomicLong _transactedMessagesOut = new AtomicLong();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
+ private final AtomicLong _maximumMessageSizeIn = new AtomicLong();
@ManagedAttributeField
private int _statisticsReportingPeriod;
@@ -145,6 +146,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private ScheduledFuture<?> _assignTargetSizeSchedulingFuture;
private volatile ScheduledFuture<?> _statisticsReportingFuture;
private long _housekeepingCheckPeriod;
+ private volatile boolean _maximumMessageSizeStatisticsEnabled;
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
@@ -628,6 +630,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
_housekeepingCheckPeriod = getContextValue(Long.class, Broker.QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD);
+ _maximumMessageSizeStatisticsEnabled = getContextValue(Boolean.class, Broker.MAX_MESSAGE_SIZE__STATISTICS_ENABLED);
if (SystemUtils.getProcessPid() != null)
{
@@ -870,8 +873,14 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
{
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
+ if (_maximumMessageSizeStatisticsEnabled)
+ {
+ _maximumMessageSizeIn.getAndUpdate(current -> messageSize > current ? messageSize : current);
+ }
}
+
+
@Override
public long getFlowToDiskThreshold()
{
@@ -891,6 +900,16 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
+ public long getMaximumMessageSize()
+ {
+ if (_maximumMessageSizeStatisticsEnabled)
+ {
+ return _maximumMessageSizeIn.get();
+ }
+ return 0;
+ }
+
+ @Override
public long getMessagesIn()
{
return _messagesIn.get();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java b/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
new file mode 100644
index 0000000..da4889c
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class StatsUtils
+{
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index efebe75..8a9d928 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -186,6 +186,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
private final AtomicLong _totalConnectionCount = new AtomicLong();
+ private final AtomicLong _maximumMessageSize = new AtomicLong();
private volatile LinkRegistryModel _linkRegistry;
private AtomicBoolean _blocked = new AtomicBoolean();
@@ -215,6 +216,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final AccessControl _accessControl;
private volatile boolean _createDefaultExchanges;
+ private volatile boolean _maximumMessageSizeStatisticsEnabled;
private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator()
{
@@ -603,6 +605,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
_flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
_isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);
+ _maximumMessageSizeStatisticsEnabled = getContextValue(Boolean.class, Broker.MAX_MESSAGE_SIZE__STATISTICS_ENABLED);
QpidServiceLoader serviceLoader = new QpidServiceLoader();
for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -1504,6 +1507,16 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public long getMaximumMessageSizeIn()
+ {
+ if (_maximumMessageSizeStatisticsEnabled)
+ {
+ return _maximumMessageSize.get();
+ }
+ return 0;
+ }
+
+ @Override
public MessageDestination getDefaultDestination()
{
return _defaultDestination;
@@ -1665,6 +1678,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
_broker.registerMessageReceived(messageSize);
+ if (_maximumMessageSizeStatisticsEnabled)
+ {
+ _maximumMessageSize.getAndUpdate(current -> messageSize > current ? messageSize : current);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 6cee2a0..470e793 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -280,6 +280,15 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
description = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk.")
long getBytesEvacuatedFromMemory();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+ units = StatisticUnit.BYTES,
+ label = "Maximum inbound message size",
+ description = "Maximum size of message published into the Virtual Host since start-up."
+ + " The statistics is only evaluated when context variable"
+ + " 'qpid.broker.maxMessageSizeStatisticsEnabled' is set to 'true'.")
+ long getMaximumMessageSizeIn();
+
@Override
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<? extends Connection<?>> getConnections();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org