You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 00:36:22 UTC

[07/17] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times

https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e4183ec4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e4183ec4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e4183ec4

Branch: refs/heads/activemq-5.10.x
Commit: e4183ec48d51efb086ca4d9ce1cbc458931f98c6
Parents: 9146785
Author: gtully <ga...@gmail.com>
Authored: Fri Jul 25 11:46:36 2014 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 14:56:21 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/jmx/DestinationView.java  |  5 +++++
 .../activemq/broker/jmx/DestinationViewMBean.java    | 10 ++++++++++
 .../broker/region/DestinationStatistics.java         | 10 ++++++++++
 .../org/apache/activemq/broker/region/Queue.java     |  3 +++
 .../activemq/broker/region/TopicSubscription.java    |  3 +++
 .../activemq/console/command/DstatCommand.java       |  9 ++++++---
 .../org/apache/activemq/broker/jmx/MBeanTest.java    |  1 +
 .../activemq/network/DemandForwardingBridgeTest.java | 15 +++++++++++++++
 .../usecases/ThreeBrokerTopicNetworkTest.java        |  4 ++++
 9 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 3f62943..ec6fe7c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -90,6 +90,11 @@ public class DestinationView implements DestinationViewMBean {
     }
 
     @Override
+    public long getForwardCount() {
+        return destination.getDestinationStatistics().getForwards().getCount();
+    }
+
+    @Override
     public long getDispatchCount() {
         return destination.getDestinationStatistics().getDispatched().getCount();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index f83d47e..a42bcfa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,16 @@ public interface DestinationViewMBean {
     long getDequeueCount();
 
     /**
+     * Returns the number of messages that have been acknowledged by network subscriptions from the
+     * destination.
+     *
+     * @return The number of messages that have been acknowledged by network subscriptions from the
+     *         destination.
+     */
+    @MBeanInfo("Number of messages that have been forwarded (to a networked broker) from the destination.")
+    long getForwardCount();
+
+    /**
      * Returns the number of messages that have been dispatched but not
      * acknowledged
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index ee2b478..0a9176e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl {
 
     protected CountStatisticImpl enqueues;
     protected CountStatisticImpl dequeues;
+    protected CountStatisticImpl forwards;
     protected CountStatisticImpl consumers;
     protected CountStatisticImpl producers;
     protected CountStatisticImpl messages;
@@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+        forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
         inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
         expired = new CountStatisticImpl("expired", "The number of messages that have expired");
 
@@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl {
         return dequeues;
     }
 
+    public CountStatisticImpl getForwards() {
+        return forwards;
+    }
+
     public CountStatisticImpl getInflight() {
         return inflight;
     }
@@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
             super.reset();
             enqueues.reset();
             dequeues.reset();
+            forwards.reset();
             dispatched.reset();
             inflight.reset();
             expired.reset();
@@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues.setEnabled(enabled);
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
+        forwards.setEnabled(enabled);
         inflight.setEnabled(enabled);
         expired.setEnabled(true);
         consumers.setEnabled(enabled);
@@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(parent.enqueues);
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
+            forwards.setParent(parent.forwards);
             inflight.setParent(parent.inflight);
             expired.setParent(parent.expired);
             consumers.setParent(parent.consumers);
@@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(null);
             dispatched.setParent(null);
             dequeues.setParent(null);
+            forwards.setParent(null);
             inflight.setParent(null);
             expired.setParent(null);
             consumers.setParent(null);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 06c74db..647ba68 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             } finally {
                 messagesLock.writeLock().unlock();
             }
+            if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
+                getDestinationStatistics().getForwards().increment();
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d17fb2f..6b61379 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription {
                 if (singleDestination && destination != null) {
                     destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                     destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+                    if (info.isNetworkSubscription()) {
+                        destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
+                    }
                 }
                 dequeueCounter.addAndGet(ack.getMessageCount());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
index a6d6356..8a41d6a 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java
@@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand {
         // sort list so the names is A..Z
         Collections.sort(queueList, new ObjectInstanceComparator());
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         // Iterate through the queue result
         for (Object view : queueList) {
@@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     queueView.getConsumerCount(),
                     queueView.getEnqueueCount(),
                     queueView.getDequeueCount(),
+                    queueView.getForwardCount(),
                     queueView.getMemoryPercentUsage()));
         }
     }
@@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand {
         final String header = "%-50s  %10s  %10s  %10s  %10s  %10s  %10s";
         final String tableRow = "%-50s  %10d  %10d  %10d  %10d  %10d  %10d";
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         Collections.sort(queueList, new ObjectInstanceComparator());
 
@@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     queueView.getConsumerCount(),
                     queueView.getEnqueueCount(),
                     queueView.getDequeueCount(),
+                    queueView.getForwardCount(),
                     queueView.getMemoryPercentUsage()));
         }
     }
@@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand {
         // sort list so the names is A..Z
         Collections.sort(topicsList, new ObjectInstanceComparator());
 
-        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
+        context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
 
         // Iterate through the topics result
         for (Object view : topicsList) {
@@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand {
                     topicView.getConsumerCount(),
                     topicView.getEnqueueCount(),
                     topicView.getDequeueCount(),
+                    topicView.getForwardCount(),
                     topicView.getMemoryPercentUsage()));
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index a853b3e..30487c8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
         assertTrue("use cache", queueNew.isUseCache());
         assertTrue("cache enabled", queueNew.isCacheEnabled());
+        assertEquals("no forwards", 0, queueNew.getForwardCount());
     }
 
     public void testRemoveMessages() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 1491ba2..020a511 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -21,6 +21,7 @@ import javax.jms.DeliveryMode;
 import junit.framework.Test;
 
 import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
         // Close consumer to cause the message to rollback.
         connection1.send(consumerInfo1.createRemoveCommand());
 
+        final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+        assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
+        assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
+        assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
+
         // Now create remote consumer that should cause message to move to this
         // remote consumer.
         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
@@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
                 return receiveMessage(connection2) != null;
             }
         }));
+
+        assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == destinationStatistics.getForwards().getCount();
+            }
+        }));
+        assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
+        assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
     }
 
     public void initCombosForTestAddConsumerThenSend() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
index 33963b7..99deb28 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
+
+        assertEquals("Correct forwards from A", MESSAGE_COUNT,
+                brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount());
     }
 
     public void initCombosForTestABandBCbrokerNetworkWithSelectors() {