You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/03 18:17:56 UTC

git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4916

Updated Branches:
  refs/heads/trunk 5fa462a08 -> 07ec89037


Fix for https://issues.apache.org/jira/browse/AMQ-4916


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/07ec8903
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/07ec8903
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/07ec8903

Branch: refs/heads/trunk
Commit: 07ec890372feb58ae69578fcc8f76483de178c70
Parents: 5fa462a
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Dec 3 17:15:02 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Dec 3 17:15:56 2013 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/broker/jmx/ProducerView.java  | 12 ++++++++++++
 .../apache/activemq/broker/jmx/ProducerViewMBean.java |  9 ++++++++-
 .../apache/activemq/broker/jmx/SubscriptionView.java  | 14 ++++++++++++++
 .../activemq/broker/jmx/SubscriptionViewMBean.java    |  7 +++++++
 .../apache/activemq/broker/region/AbstractRegion.java |  3 +++
 .../activemq/broker/region/AbstractSubscription.java  |  7 +++++++
 .../apache/activemq/broker/region/Subscription.java   |  3 +++
 .../org/apache/activemq/command/ProducerInfo.java     |  6 ++++++
 .../broker/region/QueueDuplicatesFromStoreTest.java   |  6 ++++++
 9 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
index 1596d5e..6905c72 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
@@ -184,4 +184,16 @@ public class ProducerView implements ProducerViewMBean {
             producerBrokerExchange.resetFlowControl();
         }
     }
+
+    @Override
+    public void resetStatistics() {
+       if (info != null){
+           info.getSentCount().reset();
+       }
+    }
+
+    @Override
+    public long getSentCount() {
+        return info != null ? info.getSentCount().getCount() :0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
index da357c1..14c2073 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
@@ -96,6 +96,13 @@ public interface ProducerViewMBean {
     @MBeanInfo("percentage of sends Producer Blocked for Flow Control")
     int getPercentageBlocked();
 
-    @MBeanInfo("reset flow control stata")
+    @MBeanInfo("reset flow control state")
     void resetFlowControlStats();
+
+    @MBeanInfo("Resets statistics.")
+    void resetStatistics();
+
+    @MBeanInfo("Messages consumed")
+    long getSentCount();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
index 8201737..443a266 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
@@ -88,6 +88,8 @@ public class SubscriptionView implements SubscriptionViewMBean {
         return result;
     }
 
+
+
     private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
         try {
             return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
@@ -415,4 +417,16 @@ public class SubscriptionView implements SubscriptionViewMBean {
     public String getUserName() {
         return userName;
     }
+
+    @Override
+    public void resetStatistics() {
+        if (subscription != null){
+            subscription.getConsumedCount().reset();
+        }
+    }
+
+    @Override
+    public long getConsumedCount() {
+        return subscription != null ? subscription.getConsumedCount().getCount() : 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
index 9bbedc1..3c3aab3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
@@ -242,4 +242,11 @@ public interface SubscriptionViewMBean {
     @MBeanInfo("ObjectName of the Connection that created this Subscription")
     ObjectName getConnection();
 
+
+    @MBeanInfo("Resets statistics.")
+    void resetStatistics();
+
+    @MBeanInfo("Messages consumed")
+    long getConsumedCount();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 168bd96..16deed4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -392,6 +392,9 @@ public abstract class AbstractRegion implements Region {
         }
 
         producerExchange.getRegionDestination().send(producerExchange, messageSend);
+        if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
+            producerExchange.getProducerState().getInfo().getSentCount().increment();
+        }
     }
 
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 06a44bf..b2ff01c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -36,6 +36,7 @@ import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.LogicExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.selector.SelectorParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public abstract class AbstractSubscription implements Subscription {
     private int cursorMemoryHighWaterMark = 70;
     private boolean slowConsumer;
     private long lastAckTime;
+    private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed");
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         this.broker = broker;
@@ -88,6 +90,7 @@ public abstract class AbstractSubscription implements Subscription {
     @Override
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
         this.lastAckTime = System.currentTimeMillis();
+        this.consumedCount.increment();
     }
 
     @Override
@@ -276,4 +279,8 @@ public abstract class AbstractSubscription implements Subscription {
     public void setTimeOfLastMessageAck(long value) {
         this.lastAckTime = value;
     }
+
+    public CountStatisticImpl getConsumedCount(){
+        return consumedCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index dfd427d..b79b37e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -30,6 +30,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.management.CountStatisticImpl;
 
 /**
  *
@@ -234,4 +235,6 @@ public interface Subscription extends SubscriptionRecovery {
      */
     long getTimeOfLastMessageAck();
 
+    CountStatisticImpl getConsumedCount();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
index 9854c5e..05ef3a4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -32,6 +33,7 @@ public class ProducerInfo extends BaseCommand {
     protected BrokerId[] brokerPath;
     protected boolean dispatchAsync;
     protected int windowSize;
+    protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker");
 
     public ProducerInfo() {
     }
@@ -135,4 +137,8 @@ public class ProducerInfo extends BaseCommand {
         this.windowSize = windowSize;
     }
 
+    public CountStatisticImpl getSentCount(){
+        return sentCount;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 6fe28fa..9da839d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -44,6 +44,7 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -339,6 +340,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             public long getTimeOfLastMessageAck() {
                 return 0;
             }
+
+            @Override
+            public CountStatisticImpl getConsumedCount() {
+                return null;
+            }
         };
 
         queue.addSubscription(contextNotInTx, subscription);