You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/16 13:37:48 UTC

svn commit: r1769977 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ client/example/src/main/java/org/apache/qpid/example/

Author: rgodfrey
Date: Wed Nov 16 13:37:48 2016
New Revision: 1769977

URL: http://svn.apache.org/viewvc?rev=1769977&view=rev
Log:
QPID-7517 : Add managed operation to retrieve selected statistics from a configured object

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
    qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Wed Nov 16 13:37:48 2016
@@ -3191,11 +3191,21 @@ public abstract class AbstractConfigured
     @Override
     public Map<String, Object> getStatistics()
     {
+        return getStatistics(Collections.<String>emptyList());
+    }
+
+    @Override
+    public Map<String, Object> getStatistics(List<String> statistics)
+    {
         Collection<ConfiguredObjectStatistic> stats = getTypeRegistry().getStatistics(getClass());
         Map<String,Object> map = new HashMap<>();
+        boolean allStats = statistics == null || statistics.isEmpty();
         for(ConfiguredObjectStatistic stat : stats)
         {
-            map.put(stat.getName(), stat.getValue(this));
+            if(allStats || statistics.contains(stat.getName()))
+            {
+                map.put(stat.getName(), stat.getValue(this));
+            }
         }
         return map;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Wed Nov 16 13:37:48 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
 import java.security.AccessControlException;
 import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -171,6 +172,10 @@ public interface ConfiguredObject<X exte
     @ManagedAttribute( defaultValue = "PERMANENT" )
     LifetimePolicy getLifetimePolicy();
 
+    @ManagedOperation(description = "Return the (selected) statistic values", nonModifying = true, changesConfiguredObjectState = false)
+    Map<String, Object> getStatistics(@Param(name = "statistics", defaultValue = "{}",
+            description = "Option list of statistic values to retrieve") List<String> statistics);
+
     /**
      * Get the names of attributes that are set on this object
      *

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Nov 16 13:37:48 2016
@@ -36,7 +36,6 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.NotificationCheck;
-import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.store.MessageDurability;
@@ -290,15 +289,36 @@ public interface Queue<X extends Queue<X
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Enqueued")
     long getTotalEnqueuedMessages();
 
-    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Prefetched")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
     long getUnacknowledgedBytes();
 
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
     long getUnacknowledgedMessages();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Available")
+    long getAvailableBytes();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Available")
+    int getAvailableMessages();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Available HWM")
+    long getAvailableBytesHighWatermark();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Available HWM")
+    int getAvailableMessagesHighWatermark();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth HWM")
+    long getQueueDepthBytesHighWatermark();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Queue Depth HWM")
+    int getQueueDepthMessagesHighWatermark();
+
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Oldest Message")
     long getOldestMessageAge();
 
+    @ManagedOperation(description = "reset cumulative and high watermark statistics values", changesConfiguredObjectState = false)
+    void resetStatistics();
+
     @ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved") Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 16 13:37:48 2016
@@ -1315,12 +1315,55 @@ public abstract class AbstractQueue<X ex
         return _queueStatistics.getQueueCount();
     }
 
+    @Override
     public long getQueueDepthBytes()
     {
         return _queueStatistics.getQueueSize();
     }
 
     @Override
+    public long getAvailableBytes()
+    {
+        return _queueStatistics.getAvailableSize();
+    }
+
+    @Override
+    public int getAvailableMessages()
+    {
+        return _queueStatistics.getAvailableCount();
+    }
+
+    @Override
+    public long getAvailableBytesHighWatermark()
+    {
+        return _queueStatistics.getAvailableSizeHwm();
+    }
+
+    @Override
+    public int getAvailableMessagesHighWatermark()
+    {
+        return _queueStatistics.getAvailableCountHwm();
+    }
+
+    @Override
+    public long getQueueDepthBytesHighWatermark()
+    {
+        return _queueStatistics.getQueueSizeHwm();
+    }
+
+    @Override
+    public int getQueueDepthMessagesHighWatermark()
+    {
+        return _queueStatistics.getQueueCountHwm();
+    }
+
+    @Override
+    public void resetStatistics()
+    {
+        _queueStatistics.reset();
+    }
+
+    @Override
     public long getOldestMessageArrivalTime()
     {
         long oldestMessageArrivalTime = -1L;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Wed Nov 16 13:37:48 2016
@@ -46,6 +46,12 @@ final class QueueStatistics
     private final AtomicLong _persistentDequeueCount = new AtomicLong();
     private final AtomicLong _persistentDequeueSize = new AtomicLong();
 
+    private final AtomicInteger _queueCountHwm = new AtomicInteger();
+    private final AtomicLong _queueSizeHwm = new AtomicLong();
+
+    private final AtomicInteger _availableCountHwm = new AtomicInteger();
+    private final AtomicLong _availableSizeHwm = new AtomicLong();
+
     public final int getQueueCount()
     {
         return _queueCount.get();
@@ -116,12 +122,40 @@ final class QueueStatistics
         return _persistentDequeueSize.get();
     }
 
+    public final int getQueueCountHwm()
+    {
+        return _queueCountHwm.get();
+    }
 
+    public final long getQueueSizeHwm()
+    {
+        return _queueSizeHwm.get();
+    }
+
+    public final int getAvailableCountHwm()
+    {
+        return _availableCountHwm.get();
+    }
+
+    public final long getAvailableSizeHwm()
+    {
+        return _availableSizeHwm.get();
+    }
 
     void addToQueue(long size)
     {
-        _queueCount.incrementAndGet();
-        _queueSize.addAndGet(size);
+        int count = _queueCount.incrementAndGet();
+        long queueSize = _queueSize.addAndGet(size);
+        int hwm;
+        while((hwm = _queueCountHwm.get()) < count)
+        {
+            _queueCountHwm.compareAndSet(hwm, count);
+        }
+        long sizeHwm;
+        while((sizeHwm = _queueSizeHwm.get()) < queueSize)
+        {
+            _queueSizeHwm.compareAndSet(sizeHwm, queueSize);
+        }
     }
 
     void removeFromQueue(long size)
@@ -132,8 +166,18 @@ final class QueueStatistics
 
     void addToAvailable(long size)
     {
-        _availableCount.incrementAndGet();
-        _availableSize.addAndGet(size);
+        int count = _availableCount.incrementAndGet();
+        long availableSize = _availableSize.addAndGet(size);
+        int hwm;
+        while((hwm = _availableCountHwm.get()) < count)
+        {
+            _availableCountHwm.compareAndSet(hwm, count);
+        }
+        long sizeHwm;
+        while((sizeHwm = _availableSizeHwm.get()) < availableSize)
+        {
+            _availableSizeHwm.compareAndSet(sizeHwm, availableSize);
+        }
     }
 
     void removeFromAvailable(long size)
@@ -178,4 +222,20 @@ final class QueueStatistics
         _persistentDequeueSize.addAndGet(size);
     }
 
+    void reset()
+    {
+        _availableCountHwm.set(0);
+        _availableSizeHwm.set(0L);
+        _queueCountHwm.set(0);
+        _queueSizeHwm.set(0L);
+        _enqueueCount.set(0L);
+        _enqueueSize.set(0L);
+        _dequeueCount.set(0L);
+        _dequeueSize.set(0L);
+        _persistentEnqueueCount.set(0L);
+        _persistentEnqueueSize.set(0L);
+        _persistentDequeueCount.set(0L);
+        _persistentDequeueSize.set(0L);
+    }
+
 }

Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties Wed Nov 16 13:37:48 2016
@@ -21,8 +21,11 @@ java.naming.factory.initial = org.apache
 # register some connection factories
 # connectionfactory.[jndiname] = [ConnectionURL]
 connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
+connectionfactory.qpidConnectionfactorySsl = amqp://guest:guest@clientid/?brokerlist='tcp://192.168.105.76:5671?trust_store='/Users/rob/qpid-java-queuerunner/test-profiles/test_resources/ssl/java_client_truststore.jks'&trust_store_password='password'&ssl_verify_hostname='false''&ssl='true'&maxprefetch='50000'
+
 
 # Register an AMQP destination in JNDI
 # destination.[jniName] = [Address Format]
 destination.topicExchange = amq.topic
 
+queue.tempQueue = testQueue



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org