You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/02 17:00:30 UTC
activemq git commit: [AMQ-6614] fix up jmx blockedSendsCount and
producer view blocking flag for async send case. fix and test
Repository: activemq
Updated Branches:
refs/heads/master eab9a0d05 -> e67d48680
[AMQ-6614] fix up jmx blockedSendsCount and producer view blocking flag for async send case. fix and test
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e67d4868
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e67d4868
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e67d4868
Branch: refs/heads/master
Commit: e67d48680f16061d0a5fbd09129ce8f0a7759255
Parents: eab9a0d
Author: gtully <ga...@gmail.com>
Authored: Thu Mar 2 17:00:16 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Mar 2 17:00:16 2017 +0000
----------------------------------------------------------------------
.../activemq/broker/ProducerBrokerExchange.java | 3 +-
.../apache/activemq/broker/region/Queue.java | 5 ++
.../transport/nio/NIOAsyncSendWithPFCTest.java | 83 +++++++++++++++++---
3 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
index bf1d21e..26652de 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
@@ -43,7 +43,7 @@ public class ProducerBrokerExchange {
private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private BrokerService brokerService;
- private final FlowControlInfo flowControlInfo = new FlowControlInfo();
+ private FlowControlInfo flowControlInfo = new FlowControlInfo();
public ProducerBrokerExchange() {
}
@@ -55,6 +55,7 @@ public class ProducerBrokerExchange {
rc.region = region;
rc.producerState = producerState;
rc.mutable = mutable;
+ rc.flowControlInfo = flowControlInfo;
return rc;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6283232..2b5c0c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -689,10 +689,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} else {
LOG.debug("unexpected exception on deferred send of: {}", message, e);
}
+ } finally {
+ getDestinationStatistics().getBlockedSends().decrement();
+ producerExchangeCopy.blockingOnFlowControl(false);
}
}
});
+ getDestinationStatistics().getBlockedSends().increment();
+ producerExchange.blockingOnFlowControl(true);
if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));
http://git-wip-us.apache.org/repos/asf/activemq/blob/e67d4868/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
index a9cc901..8c971bf 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
@@ -19,9 +19,9 @@ package org.apache.activemq.transport.nio;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
+import org.apache.activemq.broker.jmx.ProducerViewMBean;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -109,7 +109,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
try {
- sendMessagesAsync(1, DESTINATION_TWO);
+ sendMessages(1, DESTINATION_TWO, false);
} catch (Exception ex) {
LOG.error("Ex on send new connection", ex);
fail("*** received the following exception when creating addition producer new connection:" + ex);
@@ -148,6 +148,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
//wait till producer follow control kicks in
waitForProducerFlowControl(broker, queueView);
+ assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
try {
Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -166,25 +167,63 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
}
+ public void testSyncSendPFCExistingConnection() throws Exception {
+
+ BrokerService broker = createBroker();
+ broker.waitUntilStarted();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
+ QueueView queueView = getQueueView(broker, DESTINATION_ONE);
+
+ try {
+
+ for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
+
+ executorService.submit(new ProducerTask(true));
+
+ }
+
+ //wait till producer follow control kicks in
+ waitForProducerFlowControl(broker, queueView);
+ assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
+
+
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ }
+
private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception {
boolean blockingAllSends;
do {
- blockingAllSends = queueView.getBlockedSends() > 10;
+ blockingAllSends = queueView.getBlockedSends() >= 10;
+ LOG.info("Blocking all sends:" + queueView.getBlockedSends());
Thread.sleep(1000);
} while (!blockingAllSends);
}
class ProducerTask implements Runnable {
+ boolean sync = false;
+
+ ProducerTask() {
+ this(false);
+ }
+
+ ProducerTask(boolean sync) {
+ this.sync = sync;
+ }
@Override
public void run() {
try {
//send X messages
- sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE);
+ sendMessages(MESSAGES_TO_SEND, DESTINATION_ONE, sync);
} catch (Exception e) {
e.printStackTrace();
}
@@ -192,7 +231,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
}
- private Long sendMessagesAsync(int messageCount, String destination) throws Exception {
+ private Long sendMessages(int messageCount, String destination, boolean sync) throws Exception {
long numberOfMessageSent = 0;
@@ -201,7 +240,12 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setUseAsyncSend(true);
+ if (sync) {
+ connection.setUseAsyncSend(false);
+ connection.setAlwaysSyncSend(true);
+ } else {
+ connection.setUseAsyncSend(true);
+ }
connection.start();
try {
@@ -221,16 +265,16 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
LOG.info(" Finished after producing : " + numberOfMessageSent);
return numberOfMessageSent;
- } catch (Exception ex) {
- LOG.info("Exception received producing ", ex);
- LOG.info("finishing after exception :" + numberOfMessageSent);
- return numberOfMessageSent;
+ } catch (JMSException expected) {
+ LOG.debug("Exception received producing ", expected);
} finally {
if (connection != null) {
- connection.close();
+ try {
+ connection.close();
+ } catch (JMSException ignored) {}
}
}
-
+ return numberOfMessageSent;
}
private TextMessage createTextMessage(Session session) throws JMSException {
@@ -264,5 +308,20 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
return null;
}
+ private ProducerViewMBean getProducerView(BrokerService broker, String qName) throws Exception {
+ ObjectName[] qProducers = broker.getAdminView().getQueueProducers();
+ for (ObjectName name : qProducers) {
+ ProducerViewMBean proxy = (ProducerViewMBean) broker.getManagementContext()
+ .newProxyInstance(name, ProducerViewMBean.class, true);
+
+ LOG.info("" + proxy.getProducerId() + ", dest: " + proxy.getDestinationName() + ", blocked: " + proxy.isProducerBlocked());
+
+ if (proxy.getDestinationName().contains(qName)) {
+ return proxy;
+ }
+ }
+ return null;
+ }
+
}