You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/03 18:17:56 UTC
git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4916
Updated Branches:
refs/heads/trunk 5fa462a08 -> 07ec89037
Fix for https://issues.apache.org/jira/browse/AMQ-4916
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/07ec8903
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/07ec8903
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/07ec8903
Branch: refs/heads/trunk
Commit: 07ec890372feb58ae69578fcc8f76483de178c70
Parents: 5fa462a
Author: Rob Davies <ra...@gmail.com>
Authored: Tue Dec 3 17:15:02 2013 +0000
Committer: Rob Davies <ra...@gmail.com>
Committed: Tue Dec 3 17:15:56 2013 +0000
----------------------------------------------------------------------
.../org/apache/activemq/broker/jmx/ProducerView.java | 12 ++++++++++++
.../apache/activemq/broker/jmx/ProducerViewMBean.java | 9 ++++++++-
.../apache/activemq/broker/jmx/SubscriptionView.java | 14 ++++++++++++++
.../activemq/broker/jmx/SubscriptionViewMBean.java | 7 +++++++
.../apache/activemq/broker/region/AbstractRegion.java | 3 +++
.../activemq/broker/region/AbstractSubscription.java | 7 +++++++
.../apache/activemq/broker/region/Subscription.java | 3 +++
.../org/apache/activemq/command/ProducerInfo.java | 6 ++++++
.../broker/region/QueueDuplicatesFromStoreTest.java | 6 ++++++
9 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
index 1596d5e..6905c72 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
@@ -184,4 +184,16 @@ public class ProducerView implements ProducerViewMBean {
producerBrokerExchange.resetFlowControl();
}
}
+
+ @Override
+ public void resetStatistics() {
+ if (info != null){
+ info.getSentCount().reset();
+ }
+ }
+
+ @Override
+ public long getSentCount() {
+ return info != null ? info.getSentCount().getCount() :0;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
index da357c1..14c2073 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
@@ -96,6 +96,13 @@ public interface ProducerViewMBean {
@MBeanInfo("percentage of sends Producer Blocked for Flow Control")
int getPercentageBlocked();
- @MBeanInfo("reset flow control stata")
+ @MBeanInfo("reset flow control state")
void resetFlowControlStats();
+
+ @MBeanInfo("Resets statistics.")
+ void resetStatistics();
+
+ @MBeanInfo("Messages consumed")
+ long getSentCount();
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
index 8201737..443a266 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
@@ -88,6 +88,8 @@ public class SubscriptionView implements SubscriptionViewMBean {
return result;
}
+
+
private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
try {
return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
@@ -415,4 +417,16 @@ public class SubscriptionView implements SubscriptionViewMBean {
public String getUserName() {
return userName;
}
+
+ @Override
+ public void resetStatistics() {
+ if (subscription != null){
+ subscription.getConsumedCount().reset();
+ }
+ }
+
+ @Override
+ public long getConsumedCount() {
+ return subscription != null ? subscription.getConsumedCount().getCount() : 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
index 9bbedc1..3c3aab3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
@@ -242,4 +242,11 @@ public interface SubscriptionViewMBean {
@MBeanInfo("ObjectName of the Connection that created this Subscription")
ObjectName getConnection();
+
+ @MBeanInfo("Resets statistics.")
+ void resetStatistics();
+
+ @MBeanInfo("Messages consumed")
+ long getConsumedCount();
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 168bd96..16deed4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -392,6 +392,9 @@ public abstract class AbstractRegion implements Region {
}
producerExchange.getRegionDestination().send(producerExchange, messageSend);
+ if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
+ producerExchange.getProducerState().getInfo().getSentCount().increment();
+ }
}
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 06a44bf..b2ff01c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -36,6 +36,7 @@ import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
+import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.selector.SelectorParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public abstract class AbstractSubscription implements Subscription {
private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer;
private long lastAckTime;
+ private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed");
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@@ -88,6 +90,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
+ this.consumedCount.increment();
}
@Override
@@ -276,4 +279,8 @@ public abstract class AbstractSubscription implements Subscription {
public void setTimeOfLastMessageAck(long value) {
this.lastAckTime = value;
}
+
+ public CountStatisticImpl getConsumedCount(){
+ return consumedCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index dfd427d..b79b37e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -30,6 +30,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.management.CountStatisticImpl;
/**
*
@@ -234,4 +235,6 @@ public interface Subscription extends SubscriptionRecovery {
*/
long getTimeOfLastMessageAck();
+ CountStatisticImpl getConsumedCount();
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
index 9854c5e..05ef3a4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.state.CommandVisitor;
/**
@@ -32,6 +33,7 @@ public class ProducerInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected boolean dispatchAsync;
protected int windowSize;
+ protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker");
public ProducerInfo() {
}
@@ -135,4 +137,8 @@ public class ProducerInfo extends BaseCommand {
this.windowSize = windowSize;
}
+ public CountStatisticImpl getSentCount(){
+ return sentCount;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/07ec8903/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 6fe28fa..9da839d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -44,6 +44,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -339,6 +340,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public long getTimeOfLastMessageAck() {
return 0;
}
+
+ @Override
+ public CountStatisticImpl getConsumedCount() {
+ return null;
+ }
};
queue.addSubscription(contextNotInTx, subscription);