You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/06/06 15:14:46 UTC

[3/4] qpid-broker-j git commit: QPID-8204: [Broker-J] Add statistics to report the maximum size of incoming messages

QPID-8204: [Broker-J] Add statistics to report the maximum size of incoming messages

(cherry picked from commit fb9ef05587cd0076ffd0947930ae642721eddaba)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bd09c16a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bd09c16a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bd09c16a

Branch: refs/heads/7.0.x
Commit: bd09c16a36c4f6316972b7174fa9a20cc08b6f90
Parents: 881323e
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jun 6 13:27:43 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 6 14:28:02 2018 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Broker.java    | 15 ++++++++++++
 .../apache/qpid/server/model/BrokerImpl.java    | 19 +++++++++++++++
 .../org/apache/qpid/server/util/StatsUtils.java | 25 ++++++++++++++++++++
 .../server/virtualhost/AbstractVirtualHost.java | 17 +++++++++++++
 .../virtualhost/QueueManagingVirtualHost.java   |  9 +++++++
 5 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 7978b1b..770709a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -126,6 +126,12 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
 
     String PROPERTY_DISABLED_FEATURES = "qpid.broker_disabled_features";
 
+    String MAX_MESSAGE_SIZE__STATISTICS_ENABLED = "qpid.broker.maxMessageSizeStatisticsEnabled";
+    @ManagedContextDefault(name = MAX_MESSAGE_SIZE__STATISTICS_ENABLED,
+                            description = "Flag to enable collection of maximum message size statistics.")
+    @SuppressWarnings("unused")
+    boolean DEFAULT_MAX_MESSAGE_SIZE_STATISTICS_ENABLED = false;
+
     @DerivedAttribute
     String getBuildVersion();
 
@@ -294,6 +300,15 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
             description = "Number of unused direct memory buffers currently in the pool.")
     long getNumberOfBuffersInPool();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+            units = StatisticUnit.BYTES,
+            label = "Maximum inbound message size",
+            description = "Maximum size of messages published into the Broker since start-up."
+                          + " The statistics is only evaluated when context variable"
+                          + " 'qpid.broker.maxMessageSizeStatisticsEnabled' is set to 'true'.")
+    long getMaximumMessageSize();
+
     @ManagedOperation(nonModifying = true,
             description = "Restart the broker within the same JVM",
             changesConfiguredObjectState = false,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 9eac92f..84554bf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -121,6 +121,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private final AtomicLong _transactedMessagesOut = new AtomicLong();
     private final AtomicLong _bytesIn = new AtomicLong();
     private final AtomicLong _bytesOut = new AtomicLong();
+    private final AtomicLong _maximumMessageSizeIn = new AtomicLong();
 
     @ManagedAttributeField
     private int _statisticsReportingPeriod;
@@ -145,6 +146,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     private ScheduledFuture<?> _assignTargetSizeSchedulingFuture;
     private volatile ScheduledFuture<?> _statisticsReportingFuture;
     private long _housekeepingCheckPeriod;
+    private volatile boolean _maximumMessageSizeStatisticsEnabled;
 
     @ManagedObjectFactoryConstructor
     public BrokerImpl(Map<String, Object> attributes,
@@ -628,6 +630,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
         _compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
         _compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
         _housekeepingCheckPeriod = getContextValue(Long.class, Broker.QPID_BROKER_HOUSEKEEPING_CHECK_PERIOD);
+        _maximumMessageSizeStatisticsEnabled = getContextValue(Boolean.class, Broker.MAX_MESSAGE_SIZE__STATISTICS_ENABLED);
 
         if (SystemUtils.getProcessPid() != null)
         {
@@ -870,8 +873,14 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     {
         _messagesIn.incrementAndGet();
         _bytesIn.addAndGet(messageSize);
+        if (_maximumMessageSizeStatisticsEnabled)
+        {
+           _maximumMessageSizeIn.getAndUpdate(current -> messageSize > current ? messageSize : current);
+        }
     }
 
+
+
     @Override
     public long getFlowToDiskThreshold()
     {
@@ -891,6 +900,16 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
     }
 
     @Override
+    public long getMaximumMessageSize()
+    {
+        if (_maximumMessageSizeStatisticsEnabled)
+        {
+            return _maximumMessageSizeIn.get();
+        }
+        return 0;
+    }
+
+    @Override
     public long getMessagesIn()
     {
         return _messagesIn.get();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java b/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
new file mode 100644
index 0000000..da4889c
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/StatsUtils.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public class StatsUtils
+{
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index efebe75..8a9d928 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -186,6 +186,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     private final AtomicLong _bytesIn = new AtomicLong();
     private final AtomicLong _bytesOut = new AtomicLong();
     private final AtomicLong _totalConnectionCount = new AtomicLong();
+    private final AtomicLong _maximumMessageSize = new AtomicLong();
 
     private volatile LinkRegistryModel _linkRegistry;
     private AtomicBoolean _blocked = new AtomicBoolean();
@@ -215,6 +216,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     private final AccessControl _accessControl;
 
     private volatile boolean _createDefaultExchanges;
+    private volatile boolean _maximumMessageSizeStatisticsEnabled;
 
     private final AccessControl _systemUserAllowed = new SubjectFixedResultAccessControl(new ResultCalculator()
     {
@@ -603,6 +605,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         _fileSystemMaxUsagePercent = getContextValue(Integer.class, Broker.STORE_FILESYSTEM_MAX_USAGE_PERCENT);
         _flowToDiskCheckPeriod = getContextValue(Long.class, FLOW_TO_DISK_CHECK_PERIOD);
         _isDiscardGlobalSharedSubscriptionLinksOnDetach = getContextValue(Boolean.class, DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH);
+        _maximumMessageSizeStatisticsEnabled = getContextValue(Boolean.class, Broker.MAX_MESSAGE_SIZE__STATISTICS_ENABLED);
 
         QpidServiceLoader serviceLoader = new QpidServiceLoader();
         for(ConnectionValidator validator : serviceLoader.instancesOf(ConnectionValidator.class))
@@ -1504,6 +1507,16 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
     }
 
     @Override
+    public long getMaximumMessageSizeIn()
+    {
+        if (_maximumMessageSizeStatisticsEnabled)
+        {
+            return _maximumMessageSize.get();
+        }
+        return 0;
+    }
+
+    @Override
     public MessageDestination getDefaultDestination()
     {
         return _defaultDestination;
@@ -1665,6 +1678,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
         _messagesIn.incrementAndGet();
         _bytesIn.addAndGet(messageSize);
         _broker.registerMessageReceived(messageSize);
+        if (_maximumMessageSizeStatisticsEnabled)
+        {
+            _maximumMessageSize.getAndUpdate(current ->  messageSize > current ? messageSize : current);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd09c16a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 6cee2a0..470e793 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -280,6 +280,15 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
                       description = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk.")
     long getBytesEvacuatedFromMemory();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
+            units = StatisticUnit.BYTES,
+            label = "Maximum inbound message size",
+            description = "Maximum size of message published into the Virtual Host since start-up."
+                          + " The statistics is only evaluated when context variable"
+                          + " 'qpid.broker.maxMessageSizeStatisticsEnabled' is set to 'true'.")
+    long getMaximumMessageSizeIn();
+
     @Override
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
     Collection<? extends Connection<?>> getConnections();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org