You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/02 17:00:30 UTC

activemq git commit: [AMQ-6614] fix up jmx blockedSendsCount and producer view blocking flag for async send case. fix and test

Repository: activemq
Updated Branches:
  refs/heads/master eab9a0d05 -> e67d48680


[AMQ-6614] fix up jmx blockedSendsCount and producer view blocking flag for async send case. fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e67d4868
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e67d4868
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e67d4868

Branch: refs/heads/master
Commit: e67d48680f16061d0a5fbd09129ce8f0a7759255
Parents: eab9a0d
Author: gtully <ga...@gmail.com>
Authored: Thu Mar 2 17:00:16 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Mar 2 17:00:16 2017 +0000

----------------------------------------------------------------------
 .../activemq/broker/ProducerBrokerExchange.java |  3 +-
 .../apache/activemq/broker/region/Queue.java    |  5 ++
 .../transport/nio/NIOAsyncSendWithPFCTest.java  | 83 +++++++++++++++++---
 3 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
index bf1d21e..26652de 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
@@ -43,7 +43,7 @@ public class ProducerBrokerExchange {
     private boolean auditProducerSequenceIds;
     private boolean isNetworkProducer;
     private BrokerService brokerService;
-    private final FlowControlInfo flowControlInfo = new FlowControlInfo();
+    private FlowControlInfo flowControlInfo = new FlowControlInfo();
 
     public ProducerBrokerExchange() {
     }
@@ -55,6 +55,7 @@ public class ProducerBrokerExchange {
         rc.region = region;
         rc.producerState = producerState;
         rc.mutable = mutable;
+        rc.flowControlInfo = flowControlInfo;
         return rc;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6283232..2b5c0c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -689,10 +689,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                                     } else {
                                         LOG.debug("unexpected exception on deferred send of: {}", message, e);
                                     }
+                                } finally {
+                                    getDestinationStatistics().getBlockedSends().decrement();
+                                    producerExchangeCopy.blockingOnFlowControl(false);
                                 }
                             }
                         });
 
+                        getDestinationStatistics().getBlockedSends().increment();
+                        producerExchange.blockingOnFlowControl(true);
                         if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
                             flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
                                     .getSendFailIfNoSpaceAfterTimeout()));

http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
index a9cc901..8c971bf 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
@@ -19,9 +19,9 @@ package org.apache.activemq.transport.nio;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationView;
+import org.apache.activemq.broker.jmx.ProducerViewMBean;
 import org.apache.activemq.broker.jmx.QueueView;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -109,7 +109,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
 
 
             try {
-                sendMessagesAsync(1, DESTINATION_TWO);
+                sendMessages(1, DESTINATION_TWO, false);
             } catch (Exception ex) {
                 LOG.error("Ex on send  new connection", ex);
                 fail("*** received the following exception when creating addition producer new connection:" + ex);
@@ -148,6 +148,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
 
             //wait till producer follow control kicks in
             waitForProducerFlowControl(broker, queueView);
+            assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
 
             try {
                 Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -166,25 +167,63 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
 
     }
 
+    public void testSyncSendPFCExistingConnection() throws Exception {
+
+        BrokerService broker = createBroker();
+        broker.waitUntilStarted();
+
+        ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
+        QueueView queueView = getQueueView(broker, DESTINATION_ONE);
+
+        try {
+
+            for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
+
+                executorService.submit(new ProducerTask(true));
+
+            }
+
+            //wait till producer follow control kicks in
+            waitForProducerFlowControl(broker, queueView);
+            assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
+
+
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+    }
+
     private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception {
 
 
         boolean blockingAllSends;
         do {
-            blockingAllSends = queueView.getBlockedSends() > 10;
+            blockingAllSends = queueView.getBlockedSends() >= 10;
+            LOG.info("Blocking all sends:" + queueView.getBlockedSends());
             Thread.sleep(1000);
 
         } while (!blockingAllSends);
     }
 
     class ProducerTask implements Runnable {
+        boolean sync = false;
+
+        ProducerTask() {
+            this(false);
+        }
+
+        ProducerTask(boolean sync) {
+            this.sync = sync;
+        }
 
 
         @Override
         public void run() {
             try {
                 //send X messages
-                sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE);
+                sendMessages(MESSAGES_TO_SEND, DESTINATION_ONE, sync);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -192,7 +231,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
     }
 
 
-    private Long sendMessagesAsync(int messageCount, String destination) throws Exception {
+    private Long sendMessages(int messageCount, String destination, boolean sync) throws Exception {
 
         long numberOfMessageSent = 0;
 
@@ -201,7 +240,12 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
 
 
         ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
-        connection.setUseAsyncSend(true);
+        if (sync) {
+            connection.setUseAsyncSend(false);
+            connection.setAlwaysSyncSend(true);
+        } else {
+            connection.setUseAsyncSend(true);
+        }
         connection.start();
 
         try {
@@ -221,16 +265,16 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
             LOG.info(" Finished after producing : " + numberOfMessageSent);
             return numberOfMessageSent;
 
-        } catch (Exception ex) {
-            LOG.info("Exception received producing ", ex);
-            LOG.info("finishing after exception :" + numberOfMessageSent);
-            return numberOfMessageSent;
+        } catch (JMSException expected) {
+            LOG.debug("Exception received producing ", expected);
         } finally {
             if (connection != null) {
-                connection.close();
+                try {
+                    connection.close();
+                } catch (JMSException ignored) {}
             }
         }
-
+        return numberOfMessageSent;
     }
 
     private TextMessage createTextMessage(Session session) throws JMSException {
@@ -264,5 +308,20 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
         return null;
     }
 
+    private ProducerViewMBean getProducerView(BrokerService broker, String qName) throws Exception {
+        ObjectName[] qProducers = broker.getAdminView().getQueueProducers();
+        for (ObjectName name : qProducers) {
+            ProducerViewMBean proxy = (ProducerViewMBean) broker.getManagementContext()
+                    .newProxyInstance(name, ProducerViewMBean.class, true);
+
+            LOG.info("" + proxy.getProducerId() + ", dest: " + proxy.getDestinationName() + ", blocked: " + proxy.isProducerBlocked());
+
+            if (proxy.getDestinationName().contains(qName)) {
+                return proxy;
+            }
+        }
+        return null;
+    }
+
 }