You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/18 14:53:08 UTC
svn commit: r1504452 - in
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker:
./ jmx/ region/
Author: rajdavies
Date: Thu Jul 18 12:53:08 2013
New Revision: 1504452
URL: http://svn.apache.org/r1504452
Log:
Add support for exposing information about blocked sends through JMX - see https://issues.apache.org/jira/browse/AMQ-4635
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Jul 18 12:53:08 2013
@@ -50,27 +50,12 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@@ -87,6 +72,7 @@ import org.apache.activemq.broker.schedu
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@@ -107,16 +93,7 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -1269,6 +1246,20 @@ public class BrokerService implements Se
return answer;
}
+ public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
+ ProducerBrokerExchange result = null;
+
+ for (TransportConnector connector : transportConnectors) {
+ for (TransportConnection tc: connector.getConnections()){
+ result = tc.getProducerBrokerExchangeIfExists(producerInfo);
+ if (result !=null){
+ return result;
+ }
+ }
+ }
+ return result;
+ }
+
public String[] getTransportConnectorURIs() {
return transportConnectorURIs;
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Jul 18 12:53:08 2013
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.Message;
@@ -24,13 +28,8 @@ import org.apache.activemq.state.Produce
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Holds internal state in the broker for a MessageProducer
- *
- *
*/
public class ProducerBrokerExchange {
@@ -44,6 +43,7 @@ public class ProducerBrokerExchange {
private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private BrokerService brokerService;
+ private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public ProducerBrokerExchange() {
}
@@ -58,7 +58,7 @@ public class ProducerBrokerExchange {
return rc;
}
-
+
/**
* @return the connectionContext
*/
@@ -131,7 +131,7 @@ public class ProducerBrokerExchange {
/**
* Enforce duplicate suppression using info from persistence adapter
- * @param messageSend
+ *
* @return false if message should be ignored as a duplicate
*/
public boolean canDispatch(Message messageSend) {
@@ -145,14 +145,14 @@ public class ProducerBrokerExchange {
canDispatch = false;
if (LOG.isDebugEnabled()) {
LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId ["
- + producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer);
+ + producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer);
}
}
} else if (producerSequenceId <= lastSendSequenceNumber.get()) {
canDispatch = false;
if (LOG.isDebugEnabled()) {
LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
- + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber);
+ + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber);
}
} else {
// track current so we can suppress duplicates later in the stream
@@ -165,8 +165,8 @@ public class ProducerBrokerExchange {
private long getStoredSequenceIdForMessage(MessageId messageId) {
try {
return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
- } catch (IOException ignored) {
- LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
+ } catch (IOException ignored) {
+ LOG.debug("Failed to determine last producer sequence id for: " + messageId, ignored);
}
return -1;
}
@@ -180,4 +180,87 @@ public class ProducerBrokerExchange {
lastSendSequenceNumber.set(l);
LOG.debug("last stored sequence id set: " + l);
}
+
+ public void incrementSend() {
+ flowControlInfo.incrementSend();
+ }
+
+ public void blockingOnFlowControl(boolean blockingOnFlowControl) {
+ flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
+ }
+
+ public void incrementTimeBlocked(Destination destination, long timeBlocked) {
+ flowControlInfo.incrementTimeBlocked(timeBlocked);
+ }
+
+
+ public boolean isBlockedForFlowControl() {
+ return flowControlInfo.isBlockingOnFlowControl();
+ }
+
+ public void resetFlowControl() {
+ flowControlInfo.reset();
+ }
+
+ public long getTotalTimeBlocked() {
+ return flowControlInfo.getTotalTimeBlocked();
+ }
+
+ public int getPercentageBlocked() {
+ double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
+ return (int) value * 100;
+ }
+
+
+ public static class FlowControlInfo {
+ private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
+ private AtomicLong totalSends = new AtomicLong();
+ private AtomicLong sendsBlocked = new AtomicLong();
+ private AtomicLong totalTimeBlocked = new AtomicLong();
+
+
+ public boolean isBlockingOnFlowControl() {
+ return blockingOnFlowControl.get();
+ }
+
+ public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
+ this.blockingOnFlowControl.set(blockingOnFlowControl);
+ if (blockingOnFlowControl) {
+ incrementSendBlocked();
+ }
+ }
+
+
+ public long getTotalSends() {
+ return totalSends.get();
+ }
+
+ public void incrementSend() {
+ this.totalSends.incrementAndGet();
+ }
+
+ public long getSendsBlocked() {
+ return sendsBlocked.get();
+ }
+
+ public void incrementSendBlocked() {
+ this.sendsBlocked.incrementAndGet();
+ }
+
+ public long getTotalTimeBlocked() {
+ return totalTimeBlocked.get();
+ }
+
+ public void incrementTimeBlocked(long time) {
+ this.totalTimeBlocked.addAndGet(time);
+ }
+
+ public void reset() {
+ blockingOnFlowControl.set(false);
+ totalSends.set(0);
+ sendsBlocked.set(0);
+ totalTimeBlocked.set(0);
+
+ }
+ }
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jul 18 12:53:08 2013
@@ -36,44 +36,10 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
-
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ControlCommand;
-import org.apache.activemq.command.DataArrayResponse;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.IntegerResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.MBeanNetworkListener;
import org.apache.activemq.network.NetworkBridgeConfiguration;
@@ -94,10 +60,8 @@ import org.apache.activemq.transport.Res
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -1407,6 +1371,16 @@ public class TransportConnection impleme
}
}
+ public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
+ ProducerBrokerExchange result = null;
+ if (producerInfo != null && producerInfo.getProducerId() != null){
+ synchronized (producerExchanges){
+ result = producerExchanges.get(producerInfo.getProducerId());
+ }
+ }
+ return result;
+ }
+
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Thu Jul 18 12:53:08 2013
@@ -486,4 +486,19 @@ public class DestinationView implements
return destination.isDLQ();
}
+ @Override
+ public long getBlockedSends() {
+ return destination.getDestinationStatistics().getBlockedSends().getCount();
+ }
+
+ @Override
+ public double getAverageBlockedTime() {
+ return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
+ }
+
+ @Override
+ public long getTotalBlockedTime() {
+ return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
+ }
+
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Thu Jul 18 12:53:08 2013
@@ -371,4 +371,13 @@ public interface DestinationViewMBean {
@MBeanInfo("Dead Letter Queue")
boolean isDLQ();
+ @MBeanInfo("Get number of messages blocked for Flow Control")
+ long getBlockedSends();
+
+ @MBeanInfo("get the average time (ms) a message is blocked for Flow Control")
+ double getAverageBlockedTime();
+
+ @MBeanInfo("Get the total time (ms) messages are blocked for Flow Control")
+ long getTotalBlockedTime();
+
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java Thu Jul 18 12:53:08 2013
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo;
@@ -148,4 +149,39 @@ public class ProducerView implements Pro
public String getUserName() {
return userName;
}
+
+ @Override
+ public boolean isProducerBlocked() {
+ ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+ if (producerBrokerExchange != null){
+ return producerBrokerExchange.isBlockedForFlowControl();
+ }
+ return false;
+ }
+
+ @Override
+ public long getTotalTimeBlocked() {
+ ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+ if (producerBrokerExchange != null){
+ return producerBrokerExchange.getTotalTimeBlocked();
+ }
+ return 0;
+ }
+
+ @Override
+ public int getPercentageBlocked() {
+ ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+ if (producerBrokerExchange != null){
+ return producerBrokerExchange.getPercentageBlocked();
+ }
+ return 0;
+ }
+
+ @Override
+ public void resetFlowControlStats() {
+ ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+ if (producerBrokerExchange != null){
+ producerBrokerExchange.resetFlowControl();
+ }
+ }
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java Thu Jul 18 12:53:08 2013
@@ -86,4 +86,16 @@ public interface ProducerViewMBean {
*/
@MBeanInfo("User Name used to authorize creation of this Producer")
String getUserName();
+
+ @MBeanInfo("is the producer blocked for Flow Control")
+ boolean isProducerBlocked();
+
+ @MBeanInfo("total time (ms) Producer Blocked For Flow Control")
+ long getTotalTimeBlocked();
+
+ @MBeanInfo("percentage of sends Producer Blocked for Flow Control")
+ int getPercentageBlocked();
+
+ @MBeanInfo("reset flow control stata")
+ void resetFlowControlStats();
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Jul 18 12:53:08 2013
@@ -606,11 +606,11 @@ public abstract class BaseDestination im
this.storeUsageHighWaterMark = storeUsageHighWaterMark;
}
- protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
- waitForSpace(context, usage, 100, warning);
+ protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+ waitForSpace(context, producerBrokerExchange, usage, 100, warning);
}
- protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+ protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
@@ -623,6 +623,8 @@ public abstract class BaseDestination im
} else {
long start = System.currentTimeMillis();
long nextWarn = start;
+ producerBrokerExchange.blockingOnFlowControl(true);
+ destinationStatistics.getBlockedSends().increment();
while (!usage.waitForSpace(1000, highWaterMark)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
@@ -634,6 +636,11 @@ public abstract class BaseDestination im
nextWarn = now + blockedProducerWarningInterval;
}
}
+ long finish = System.currentTimeMillis();
+ long totalTimeBlocked = finish - start;
+ destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
+ producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
+ producerBrokerExchange.blockingOnFlowControl(false);
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Thu Jul 18 12:53:08 2013
@@ -39,6 +39,9 @@ public class DestinationStatistics exten
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
+ protected CountStatisticImpl blockedSends;
+ protected TimeStatisticImpl blockedTime;
+
public DestinationStatistics() {
@@ -56,6 +59,8 @@ public class DestinationStatistics exten
messages.setDoReset(false);
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination");
+ blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
+ blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@@ -66,6 +71,8 @@ public class DestinationStatistics exten
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
addStatistic("processTime", processTime);
+ addStatistic("blockedSends",blockedSends);
+ addStatistic("blockedTime",blockedTime);
}
public CountStatisticImpl getEnqueues() {
@@ -112,6 +119,13 @@ public class DestinationStatistics exten
return this.processTime;
}
+ public CountStatisticImpl getBlockedSends(){
+ return this.blockedSends;
+ }
+ public TimeStatisticImpl getBlockedTime(){
+ return this.blockedTime;
+ }
+
public void reset() {
if (this.isDoReset()) {
super.reset();
@@ -120,6 +134,8 @@ public class DestinationStatistics exten
dispatched.reset();
inflight.reset();
expired.reset();
+ blockedSends.reset();
+ blockedTime.reset();
}
}
@@ -135,6 +151,8 @@ public class DestinationStatistics exten
messages.setEnabled(enabled);
messagesCached.setEnabled(enabled);
processTime.setEnabled(enabled);
+ blockedSends.setEnabled(enabled);
+ blockedTime.setEnabled(enabled);
}
@@ -150,6 +168,8 @@ public class DestinationStatistics exten
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
processTime.setParent(parent.processTime);
+ blockedSends.setParent(parent.blockedSends);
+ blockedTime.setParent(parent.blockedTime);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@@ -161,6 +181,8 @@ public class DestinationStatistics exten
messagesCached.setParent(null);
messages.setParent(null);
processTime.setParent(null);
+ blockedSends.setParent(null);
+ blockedTime.setParent(null);
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jul 18 12:53:08 2013
@@ -716,7 +716,7 @@ public class Queue extends BaseDestinati
} else {
if (memoryUsage.isFull()) {
- waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+ waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+ message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -869,7 +869,8 @@ public class Queue extends BaseDestinati
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
- checkUsage(context, message);
+ producerExchange.incrementSend();
+ checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly();
try {
if (store != null && message.isPersistent()) {
@@ -911,7 +912,7 @@ public class Queue extends BaseDestinati
}
}
- private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
+ private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
@@ -920,7 +921,7 @@ public class Queue extends BaseDestinati
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
- waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
+ waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
} else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
final String logMessage = "Temp Store is Full ("
@@ -929,7 +930,7 @@ public class Queue extends BaseDestinati
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
- waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
+ waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage);
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jul 18 12:53:08 2013
@@ -316,6 +316,7 @@ public class Topic extends BaseDestinati
final ConnectionContext context = producerExchange.getConnectionContext();
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+ producerExchange.incrementSend();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
@@ -418,6 +419,7 @@ public class Topic extends BaseDestinati
} else {
waitForSpace(
context,
+ producerExchange,
memoryUsage,
"Usage Manager Memory Usage limit reached. Stopping producer ("
+ message.getProducerId()
@@ -475,7 +477,7 @@ public class Topic extends BaseDestinati
throw new javax.jms.ResourceAllocationException(logMessage);
}
- waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
+ waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
}