You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 00:36:22 UTC
[07/17] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to
destinationStatistics - allow local consumption to be accounted with
dequeueCount - forwardCount so forwarded messages are not accounted for num
hops times
https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e4183ec4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e4183ec4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e4183ec4
Branch: refs/heads/activemq-5.10.x
Commit: e4183ec48d51efb086ca4d9ce1cbc458931f98c6
Parents: 9146785
Author: gtully <ga...@gmail.com>
Authored: Fri Jul 25 11:46:36 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:56:21 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/broker/jmx/DestinationView.java | 5 +++++
.../activemq/broker/jmx/DestinationViewMBean.java | 10 ++++++++++
.../broker/region/DestinationStatistics.java | 10 ++++++++++
.../org/apache/activemq/broker/region/Queue.java | 3 +++
.../activemq/broker/region/TopicSubscription.java | 3 +++
.../activemq/console/command/DstatCommand.java | 9 ++++++---
.../org/apache/activemq/broker/jmx/MBeanTest.java | 1 +
.../activemq/network/DemandForwardingBridgeTest.java | 15 +++++++++++++++
.../usecases/ThreeBrokerTopicNetworkTest.java | 4 ++++
9 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 3f62943..ec6fe7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -90,6 +90,11 @@ public class DestinationView implements DestinationViewMBean {
}
@Override
+ public long getForwardCount() {
+ return destination.getDestinationStatistics().getForwards().getCount();
+ }
+
+ @Override
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index f83d47e..a42bcfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,16 @@ public interface DestinationViewMBean {
long getDequeueCount();
/**
+ * Returns the number of messages that have been acknowledged by network subscriptions from the
+ * destination.
+ *
+ * @return The number of messages that have been acknowledged by network subscriptions from the
+ * destination.
+ */
+ @MBeanInfo("Number of messages that have been forwarded (to a networked broker) from the destination.")
+ long getForwardCount();
+
+ /**
* Returns the number of messages that have been dispatched but not
* acknowledged
*
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index ee2b478..0a9176e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
+ protected CountStatisticImpl forwards;
protected CountStatisticImpl consumers;
protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
@@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+ forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl {
return dequeues;
}
+ public CountStatisticImpl getForwards() {
+ return forwards;
+ }
+
public CountStatisticImpl getInflight() {
return inflight;
}
@@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
super.reset();
enqueues.reset();
dequeues.reset();
+ forwards.reset();
dispatched.reset();
inflight.reset();
expired.reset();
@@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
+ forwards.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
consumers.setEnabled(enabled);
@@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
+ forwards.setParent(parent.forwards);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
consumers.setParent(parent.consumers);
@@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
+ forwards.setParent(null);
inflight.setParent(null);
expired.setParent(null);
consumers.setParent(null);
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 06c74db..647ba68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
messagesLock.writeLock().unlock();
}
+ if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
+ getDestinationStatistics().getForwards().increment();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d17fb2f..6b61379 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+ if (info.isNetworkSubscription()) {
+ destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
+ }
}
dequeueCounter.addAndGet(ack.getMessageCount());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
index a6d6356..8a41d6a 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
@@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(queueList, new ObjectInstanceComparator());
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the queue result
for (Object view : queueList) {
@@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
+ queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand {
final String header = "%-50s %10s %10s %10s %10s %10s %10s";
final String tableRow = "%-50s %10d %10d %10d %10d %10d %10d";
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
Collections.sort(queueList, new ObjectInstanceComparator());
@@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
+ queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(topicsList, new ObjectInstanceComparator());
- context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+ context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the topics result
for (Object view : topicsList) {
@@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand {
topicView.getConsumerCount(),
topicView.getEnqueueCount(),
topicView.getDequeueCount(),
+ topicView.getForwardCount(),
topicView.getMemoryPercentUsage()));
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index a853b3e..30487c8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertTrue("use cache", queueNew.isUseCache());
assertTrue("cache enabled", queueNew.isCacheEnabled());
+ assertEquals("no forwards", 0, queueNew.getForwardCount());
}
public void testRemoveMessages() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 1491ba2..020a511 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -21,6 +21,7 @@ import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
// Close consumer to cause the message to rollback.
connection1.send(consumerInfo1.createRemoveCommand());
+ final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+ assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
+ assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
+ assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
+
// Now create remote consumer that should cause message to move to this
// remote consumer.
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
@@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
return receiveMessage(connection2) != null;
}
}));
+
+ assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == destinationStatistics.getForwards().getCount();
+ }
+ }));
+ assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
+ assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
}
public void initCombosForTestAddConsumerThenSend() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
index 33963b7..99deb28 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
/**
@@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
+
+ assertEquals("Correct forwards from A", MESSAGE_COUNT,
+ brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount());
}
public void initCombosForTestABandBCbrokerNetworkWithSelectors() {