You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/18 14:53:08 UTC

svn commit: r1504452 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker: ./ jmx/ region/

Author: rajdavies
Date: Thu Jul 18 12:53:08 2013
New Revision: 1504452

URL: http://svn.apache.org/r1504452
Log:
Add support for exposing information about blocked sends through JMX - see https://issues.apache.org/jira/browse/AMQ-4635

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Jul 18 12:53:08 2013
@@ -50,27 +50,12 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.jmx.*;
 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
@@ -87,6 +72,7 @@ import org.apache.activemq.broker.schedu
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
@@ -107,16 +93,7 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -1269,6 +1246,20 @@ public class BrokerService implements Se
         return answer;
     }
 
+    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
+        ProducerBrokerExchange result = null;
+
+        for (TransportConnector connector : transportConnectors) {
+            for (TransportConnection tc: connector.getConnections()){
+                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
+                if (result !=null){
+                    return result;
+                }
+            }
+        }
+        return result;
+    }
+
     public String[] getTransportConnectorURIs() {
         return transportConnectorURIs;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Jul 18 12:53:08 2013
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.command.Message;
@@ -24,13 +28,8 @@ import org.apache.activemq.state.Produce
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Holds internal state in the broker for a MessageProducer
- * 
- * 
  */
 public class ProducerBrokerExchange {
 
@@ -44,6 +43,7 @@ public class ProducerBrokerExchange {
     private boolean auditProducerSequenceIds;
     private boolean isNetworkProducer;
     private BrokerService brokerService;
+    private final FlowControlInfo flowControlInfo = new FlowControlInfo();
 
     public ProducerBrokerExchange() {
     }
@@ -58,7 +58,7 @@ public class ProducerBrokerExchange {
         return rc;
     }
 
-    
+
     /**
      * @return the connectionContext
      */
@@ -131,7 +131,7 @@ public class ProducerBrokerExchange {
 
     /**
      * Enforce duplicate suppression using info from persistence adapter
-     * @param messageSend
+     *
      * @return false if message should be ignored as a duplicate
      */
     public boolean canDispatch(Message messageSend) {
@@ -145,14 +145,14 @@ public class ProducerBrokerExchange {
                     canDispatch = false;
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("suppressing duplicate message send  [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId ["
-                                + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
+                                + producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer);
                     }
                 }
             } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
                 canDispatch = false;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
-                            + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
+                            + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber);
                 }
             } else {
                 // track current so we can suppress duplicates later in the stream
@@ -165,8 +165,8 @@ public class ProducerBrokerExchange {
     private long getStoredSequenceIdForMessage(MessageId messageId) {
         try {
             return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
-       } catch (IOException ignored) {
-            LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
+        } catch (IOException ignored) {
+            LOG.debug("Failed to determine last producer sequence id for: " + messageId, ignored);
         }
         return -1;
     }
@@ -180,4 +180,87 @@ public class ProducerBrokerExchange {
         lastSendSequenceNumber.set(l);
         LOG.debug("last stored sequence id set: " + l);
     }
+
+    public void incrementSend() {
+        flowControlInfo.incrementSend();
+    }
+
+    public void blockingOnFlowControl(boolean blockingOnFlowControl) {
+        flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
+    }
+
+    public void incrementTimeBlocked(Destination destination, long timeBlocked) {
+        flowControlInfo.incrementTimeBlocked(timeBlocked);
+    }
+
+
+    public boolean isBlockedForFlowControl() {
+        return flowControlInfo.isBlockingOnFlowControl();
+    }
+
+    public void resetFlowControl() {
+        flowControlInfo.reset();
+    }
+
+    public long getTotalTimeBlocked() {
+        return flowControlInfo.getTotalTimeBlocked();
+    }
+
+    public int getPercentageBlocked() {
+        double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
+        return (int) value * 100;
+    }
+
+
+    public static class FlowControlInfo {
+        private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
+        private AtomicLong totalSends = new AtomicLong();
+        private AtomicLong sendsBlocked = new AtomicLong();
+        private AtomicLong totalTimeBlocked = new AtomicLong();
+
+
+        public boolean isBlockingOnFlowControl() {
+            return blockingOnFlowControl.get();
+        }
+
+        public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
+            this.blockingOnFlowControl.set(blockingOnFlowControl);
+            if (blockingOnFlowControl) {
+                incrementSendBlocked();
+            }
+        }
+
+
+        public long getTotalSends() {
+            return totalSends.get();
+        }
+
+        public void incrementSend() {
+            this.totalSends.incrementAndGet();
+        }
+
+        public long getSendsBlocked() {
+            return sendsBlocked.get();
+        }
+
+        public void incrementSendBlocked() {
+            this.sendsBlocked.incrementAndGet();
+        }
+
+        public long getTotalTimeBlocked() {
+            return totalTimeBlocked.get();
+        }
+
+        public void incrementTimeBlocked(long time) {
+            this.totalTimeBlocked.addAndGet(time);
+        }
+
+        public void reset() {
+            blockingOnFlowControl.set(false);
+            totalSends.set(0);
+            sendsBlocked.set(0);
+            totalTimeBlocked.set(0);
+
+        }
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jul 18 12:53:08 2013
@@ -36,44 +36,10 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.transaction.xa.XAResource;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ControlCommand;
-import org.apache.activemq.command.DataArrayResponse;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.IntegerResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.network.DemandForwardingBridge;
 import org.apache.activemq.network.MBeanNetworkListener;
 import org.apache.activemq.network.NetworkBridgeConfiguration;
@@ -94,10 +60,8 @@ import org.apache.activemq.transport.Res
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -1407,6 +1371,16 @@ public class TransportConnection impleme
         }
     }
 
+    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
+        ProducerBrokerExchange result = null;
+        if (producerInfo != null && producerInfo.getProducerId() != null){
+            synchronized (producerExchanges){
+                result = producerExchanges.get(producerInfo.getProducerId());
+            }
+        }
+        return result;
+    }
+
     private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
         ProducerBrokerExchange result = producerExchanges.get(id);
         if (result == null) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Thu Jul 18 12:53:08 2013
@@ -486,4 +486,19 @@ public class DestinationView implements 
         return destination.isDLQ();
     }
 
+    @Override
+    public long getBlockedSends() {
+        return destination.getDestinationStatistics().getBlockedSends().getCount();
+    }
+
+    @Override
+    public double getAverageBlockedTime() {
+        return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
+    }
+
+    @Override
+    public long getTotalBlockedTime() {
+        return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
+    }
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Thu Jul 18 12:53:08 2013
@@ -371,4 +371,13 @@ public interface DestinationViewMBean {
     @MBeanInfo("Dead Letter Queue")
     boolean isDLQ();
 
+    @MBeanInfo("Get number of messages blocked for Flow Control")
+    long getBlockedSends();
+
+    @MBeanInfo("get the average time (ms) a message is blocked for Flow Control")
+    double getAverageBlockedTime();
+
+    @MBeanInfo("Get the total time (ms) messages are blocked for Flow Control")
+    long getTotalBlockedTime();
+
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java Thu Jul 18 12:53:08 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerInfo;
 
@@ -148,4 +149,39 @@ public class ProducerView implements Pro
     public String getUserName() {
         return userName;
     }
+
+    @Override
+    public boolean isProducerBlocked() {
+        ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+        if (producerBrokerExchange != null){
+            return producerBrokerExchange.isBlockedForFlowControl();
+        }
+        return false;
+    }
+
+    @Override
+    public long getTotalTimeBlocked() {
+        ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+        if (producerBrokerExchange != null){
+            return producerBrokerExchange.getTotalTimeBlocked();
+        }
+        return 0;
+    }
+
+    @Override
+    public int getPercentageBlocked() {
+        ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+        if (producerBrokerExchange != null){
+            return producerBrokerExchange.getPercentageBlocked();
+        }
+        return 0;
+    }
+
+    @Override
+    public void resetFlowControlStats() {
+        ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
+        if (producerBrokerExchange != null){
+            producerBrokerExchange.resetFlowControl();
+        }
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java Thu Jul 18 12:53:08 2013
@@ -86,4 +86,16 @@ public interface ProducerViewMBean {
      */
     @MBeanInfo("User Name used to authorize creation of this Producer")
     String getUserName();
+
+    @MBeanInfo("is the producer blocked for Flow Control")
+    boolean isProducerBlocked();
+
+    @MBeanInfo("total time (ms) Producer Blocked For Flow Control")
+    long getTotalTimeBlocked();
+
+    @MBeanInfo("percentage of sends Producer Blocked for Flow Control")
+    int getPercentageBlocked();
+
+    @MBeanInfo("reset flow control stata")
+    void resetFlowControlStats();
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Jul 18 12:53:08 2013
@@ -606,11 +606,11 @@ public abstract class BaseDestination im
         this.storeUsageHighWaterMark = storeUsageHighWaterMark;
     }
 
-    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
-        waitForSpace(context, usage, 100, warning);
+    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
     }
 
-    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
         if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
             getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
             throw new ResourceAllocationException(warning);
@@ -623,6 +623,8 @@ public abstract class BaseDestination im
         } else {
             long start = System.currentTimeMillis();
             long nextWarn = start;
+            producerBrokerExchange.blockingOnFlowControl(true);
+            destinationStatistics.getBlockedSends().increment();
             while (!usage.waitForSpace(1000, highWaterMark)) {
                 if (context.getStopping().get()) {
                     throw new IOException("Connection closed, send aborted.");
@@ -634,6 +636,11 @@ public abstract class BaseDestination im
                     nextWarn = now + blockedProducerWarningInterval;
                 }
             }
+            long finish = System.currentTimeMillis();
+            long totalTimeBlocked = finish - start;
+            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
+            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
+            producerBrokerExchange.blockingOnFlowControl(false);
         }
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Thu Jul 18 12:53:08 2013
@@ -39,6 +39,9 @@ public class DestinationStatistics exten
     protected CountStatisticImpl inflight;
     protected CountStatisticImpl expired;
     protected TimeStatisticImpl processTime;
+    protected CountStatisticImpl blockedSends;
+    protected TimeStatisticImpl blockedTime;
+
 
     public DestinationStatistics() {
 
@@ -56,6 +59,8 @@ public class DestinationStatistics exten
         messages.setDoReset(false);
         messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
         processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination");
+        blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
+        blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
@@ -66,6 +71,8 @@ public class DestinationStatistics exten
         addStatistic("messages", messages);
         addStatistic("messagesCached", messagesCached);
         addStatistic("processTime", processTime);
+        addStatistic("blockedSends",blockedSends);
+        addStatistic("blockedTime",blockedTime);
     }
 
     public CountStatisticImpl getEnqueues() {
@@ -112,6 +119,13 @@ public class DestinationStatistics exten
         return this.processTime;
     }
 
+    public CountStatisticImpl getBlockedSends(){
+        return this.blockedSends;
+    }
+    public TimeStatisticImpl getBlockedTime(){
+        return this.blockedTime;
+    }
+
     public void reset() {
         if (this.isDoReset()) {
             super.reset();
@@ -120,6 +134,8 @@ public class DestinationStatistics exten
             dispatched.reset();
             inflight.reset();
             expired.reset();
+            blockedSends.reset();
+            blockedTime.reset();
         }
     }
 
@@ -135,6 +151,8 @@ public class DestinationStatistics exten
         messages.setEnabled(enabled);
         messagesCached.setEnabled(enabled);
         processTime.setEnabled(enabled);
+        blockedSends.setEnabled(enabled);
+        blockedTime.setEnabled(enabled);
 
     }
 
@@ -150,6 +168,8 @@ public class DestinationStatistics exten
             messagesCached.setParent(parent.messagesCached);
             messages.setParent(parent.messages);
             processTime.setParent(parent.processTime);
+            blockedSends.setParent(parent.blockedSends);
+            blockedTime.setParent(parent.blockedTime);
         } else {
             enqueues.setParent(null);
             dispatched.setParent(null);
@@ -161,6 +181,8 @@ public class DestinationStatistics exten
             messagesCached.setParent(null);
             messages.setParent(null);
             processTime.setParent(null);
+            blockedSends.setParent(null);
+            blockedTime.setParent(null);
         }
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jul 18 12:53:08 2013
@@ -716,7 +716,7 @@ public class Queue extends BaseDestinati
                 } else {
 
                     if (memoryUsage.isFull()) {
-                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+                        waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
                                 + message.getProducerId() + ") stopped to prevent flooding "
                                 + getActiveMQDestination().getQualifiedName() + "."
                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -869,7 +869,8 @@ public class Queue extends BaseDestinati
         final ConnectionContext context = producerExchange.getConnectionContext();
         Future<Object> result = null;
 
-        checkUsage(context, message);
+        producerExchange.incrementSend();
+        checkUsage(context, producerExchange, message);
         sendLock.lockInterruptibly();
         try {
             if (store != null && message.isPersistent()) {
@@ -911,7 +912,7 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
+    private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
         if (message.isPersistent()) {
             if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
                 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
@@ -920,7 +921,7 @@ public class Queue extends BaseDestinati
                     + getActiveMQDestination().getQualifiedName() + "."
                     + " See http://activemq.apache.org/producer-flow-control.html for more info";
 
-                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
+                waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
             }
         } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
             final String logMessage = "Temp Store is Full ("
@@ -929,7 +930,7 @@ public class Queue extends BaseDestinati
                 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                 + " See http://activemq.apache.org/producer-flow-control.html for more info";
 
-            waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
+            waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage);
         }
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1504452&r1=1504451&r2=1504452&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jul 18 12:53:08 2013
@@ -316,6 +316,7 @@ public class Topic extends BaseDestinati
         final ConnectionContext context = producerExchange.getConnectionContext();
 
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+        producerExchange.incrementSend();
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
                 && !context.isInRecoveryMode();
 
@@ -418,6 +419,7 @@ public class Topic extends BaseDestinati
                         } else {
                             waitForSpace(
                                     context,
+                                    producerExchange,
                                     memoryUsage,
                                     "Usage Manager Memory Usage limit reached. Stopping producer ("
                                             + message.getProducerId()
@@ -475,7 +477,7 @@ public class Topic extends BaseDestinati
                     throw new javax.jms.ResourceAllocationException(logMessage);
                 }
 
-                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
+                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
             }
             result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
         }