You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/16 13:37:48 UTC
svn commit: r1769977 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
client/example/src/main/java/org/apache/qpid/example/
Author: rgodfrey
Date: Wed Nov 16 13:37:48 2016
New Revision: 1769977
URL: http://svn.apache.org/viewvc?rev=1769977&view=rev
Log:
QPID-7517 : Add managed operation to retrieve selected statistics from a configured object
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Wed Nov 16 13:37:48 2016
@@ -3191,11 +3191,21 @@ public abstract class AbstractConfigured
@Override
public Map<String, Object> getStatistics()
{
+ return getStatistics(Collections.<String>emptyList());
+ }
+
+ @Override
+ public Map<String, Object> getStatistics(List<String> statistics)
+ {
Collection<ConfiguredObjectStatistic> stats = getTypeRegistry().getStatistics(getClass());
Map<String,Object> map = new HashMap<>();
+ boolean allStats = statistics == null || statistics.isEmpty();
for(ConfiguredObjectStatistic stat : stats)
{
- map.put(stat.getName(), stat.getValue(this));
+ if(allStats || statistics.contains(stat.getName()))
+ {
+ map.put(stat.getName(), stat.getValue(this));
+ }
}
return map;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Wed Nov 16 13:37:48 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -171,6 +172,10 @@ public interface ConfiguredObject<X exte
@ManagedAttribute( defaultValue = "PERMANENT" )
LifetimePolicy getLifetimePolicy();
+ @ManagedOperation(description = "Return the (selected) statistic values", nonModifying = true, changesConfiguredObjectState = false)
+ Map<String, Object> getStatistics(@Param(name = "statistics", defaultValue = "{}",
+ description = "Option list of statistic values to retrieve") List<String> statistics);
+
/**
* Get the names of attributes that are set on this object
*
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Nov 16 13:37:48 2016
@@ -36,7 +36,6 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.NotificationCheck;
-import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -290,15 +289,36 @@ public interface Queue<X extends Queue<X
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Enqueued")
long getTotalEnqueuedMessages();
- @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Prefetched")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
long getUnacknowledgedBytes();
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched")
long getUnacknowledgedMessages();
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Available")
+ long getAvailableBytes();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Available")
+ int getAvailableMessages();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Available HWM")
+ long getAvailableBytesHighWatermark();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Available HWM")
+ int getAvailableMessagesHighWatermark();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth HWM")
+ long getQueueDepthBytesHighWatermark();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Queue Depth HWM")
+ int getQueueDepthMessagesHighWatermark();
+
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Oldest Message")
long getOldestMessageAge();
+ @ManagedOperation(description = "reset cumulative and high watermark statistics values", changesConfiguredObjectState = false)
+ void resetStatistics();
+
@ManagedOperation(description = "move messages from this queue to another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved") Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 16 13:37:48 2016
@@ -1315,12 +1315,55 @@ public abstract class AbstractQueue<X ex
return _queueStatistics.getQueueCount();
}
+ @Override
public long getQueueDepthBytes()
{
return _queueStatistics.getQueueSize();
}
@Override
+ public long getAvailableBytes()
+ {
+ return _queueStatistics.getAvailableSize();
+ }
+
+ @Override
+ public int getAvailableMessages()
+ {
+ return _queueStatistics.getAvailableCount();
+ }
+
+ @Override
+ public long getAvailableBytesHighWatermark()
+ {
+ return _queueStatistics.getAvailableSizeHwm();
+ }
+
+ @Override
+ public int getAvailableMessagesHighWatermark()
+ {
+ return _queueStatistics.getAvailableCountHwm();
+ }
+
+ @Override
+ public long getQueueDepthBytesHighWatermark()
+ {
+ return _queueStatistics.getQueueSizeHwm();
+ }
+
+ @Override
+ public int getQueueDepthMessagesHighWatermark()
+ {
+ return _queueStatistics.getQueueCountHwm();
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ _queueStatistics.reset();
+ }
+
+ @Override
public long getOldestMessageArrivalTime()
{
long oldestMessageArrivalTime = -1L;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Wed Nov 16 13:37:48 2016
@@ -46,6 +46,12 @@ final class QueueStatistics
private final AtomicLong _persistentDequeueCount = new AtomicLong();
private final AtomicLong _persistentDequeueSize = new AtomicLong();
+ private final AtomicInteger _queueCountHwm = new AtomicInteger();
+ private final AtomicLong _queueSizeHwm = new AtomicLong();
+
+ private final AtomicInteger _availableCountHwm = new AtomicInteger();
+ private final AtomicLong _availableSizeHwm = new AtomicLong();
+
public final int getQueueCount()
{
return _queueCount.get();
@@ -116,12 +122,40 @@ final class QueueStatistics
return _persistentDequeueSize.get();
}
+ public final int getQueueCountHwm()
+ {
+ return _queueCountHwm.get();
+ }
+ public final long getQueueSizeHwm()
+ {
+ return _queueSizeHwm.get();
+ }
+
+ public final int getAvailableCountHwm()
+ {
+ return _availableCountHwm.get();
+ }
+
+ public final long getAvailableSizeHwm()
+ {
+ return _availableSizeHwm.get();
+ }
void addToQueue(long size)
{
- _queueCount.incrementAndGet();
- _queueSize.addAndGet(size);
+ int count = _queueCount.incrementAndGet();
+ long queueSize = _queueSize.addAndGet(size);
+ int hwm;
+ while((hwm = _queueCountHwm.get()) < count)
+ {
+ _queueCountHwm.compareAndSet(hwm, count);
+ }
+ long sizeHwm;
+ while((sizeHwm = _queueSizeHwm.get()) < queueSize)
+ {
+ _queueSizeHwm.compareAndSet(sizeHwm, queueSize);
+ }
}
void removeFromQueue(long size)
@@ -132,8 +166,18 @@ final class QueueStatistics
void addToAvailable(long size)
{
- _availableCount.incrementAndGet();
- _availableSize.addAndGet(size);
+ int count = _availableCount.incrementAndGet();
+ long availableSize = _availableSize.addAndGet(size);
+ int hwm;
+ while((hwm = _availableCountHwm.get()) < count)
+ {
+ _availableCountHwm.compareAndSet(hwm, count);
+ }
+ long sizeHwm;
+ while((sizeHwm = _availableSizeHwm.get()) < availableSize)
+ {
+ _availableSizeHwm.compareAndSet(sizeHwm, availableSize);
+ }
}
void removeFromAvailable(long size)
@@ -178,4 +222,20 @@ final class QueueStatistics
_persistentDequeueSize.addAndGet(size);
}
+ void reset()
+ {
+ _availableCountHwm.set(0);
+ _availableSizeHwm.set(0L);
+ _queueCountHwm.set(0);
+ _queueSizeHwm.set(0L);
+ _enqueueCount.set(0L);
+ _enqueueSize.set(0L);
+ _dequeueCount.set(0L);
+ _dequeueSize.set(0L);
+ _persistentEnqueueCount.set(0L);
+ _persistentEnqueueSize.set(0L);
+ _persistentDequeueCount.set(0L);
+ _persistentDequeueSize.set(0L);
+ }
+
}
Modified: qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties?rev=1769977&r1=1769976&r2=1769977&view=diff
==============================================================================
--- qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties (original)
+++ qpid/java/trunk/client/example/src/main/java/org/apache/qpid/example/hello.properties Wed Nov 16 13:37:48 2016
@@ -21,8 +21,11 @@ java.naming.factory.initial = org.apache
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
+connectionfactory.qpidConnectionfactorySsl = amqp://guest:guest@clientid/?brokerlist='tcp://192.168.105.76:5671?trust_store='/Users/rob/qpid-java-queuerunner/test-profiles/test_resources/ssl/java_client_truststore.jks'&trust_store_password='password'&ssl_verify_hostname='false''&ssl='true'&maxprefetch='50000'
+
# Register an AMQP destination in JNDI
# destination.[jniName] = [Address Format]
destination.topicExchange = amq.topic
+queue.tempQueue = testQueue
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org