You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/09/28 09:11:11 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6184 -
add workQueueCapacity config property default to 0 where a value > 0 swaps
out the dsynchQ for a capicity limited blocking queue. This allows the core
pool to grow on demand as before
Repository: activemq
Updated Branches:
refs/heads/master 45f60e413 -> 08695ab30
https://issues.apache.org/jira/browse/AMQ-6184 - add workQueueCapacity config property default to 0 where a value > 0 swaps out the dsynchQ for a capicity limited blocking queue. This allows the core pool to grow on demand as before but also allows work to be queued when necessary
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/08695ab3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/08695ab3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/08695ab3
Branch: refs/heads/master
Commit: 08695ab30306307aced0d2d51b1755bc6e29081a
Parents: 45f60e4
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 28 09:51:05 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 28 10:04:22 2016 +0100
----------------------------------------------------------------------
.../activemq/transport/nio/SelectorManager.java | 23 +-
.../transport/nio/NIOAsyncSendWithPFCTest.java | 272 +++++++++++++++++++
2 files changed, 291 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/08695ab3/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
index 54e1cc0..b6b1f50 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
@@ -19,9 +19,11 @@ package org.apache.activemq.transport.nio;
import java.io.IOException;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -40,10 +42,10 @@ public final class SelectorManager {
private Executor selectorExecutor = createDefaultExecutor();
private Executor channelExecutor = selectorExecutor;
private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
- private int maxChannelsPerWorker = 1024;
+ private int maxChannelsPerWorker = -1;
protected ExecutorService createDefaultExecutor() {
- ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, newWorkQueue(),
new ThreadFactory() {
private long i = 0;
@@ -59,8 +61,17 @@ public final class SelectorManager {
return rc;
}
+ private BlockingQueue<Runnable> newWorkQueue() {
+ final int workQueueCapicity = getDefaultWorkQueueCapacity();
+ return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity) : new SynchronousQueue<Runnable>();
+ }
+
+ private static int getDefaultWorkQueueCapacity() {
+ return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0);
+ }
+
private static int getDefaultCorePoolSize() {
- return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10);
+ return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10);
}
private static int getDefaultMaximumPoolSize() {
@@ -71,6 +82,10 @@ public final class SelectorManager {
return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30);
}
+ private static int getDefaultMaxChannelsPerWorker() {
+ return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", 1024);
+ }
+
public static SelectorManager getInstance() {
return SINGLETON;
}
@@ -124,7 +139,7 @@ public final class SelectorManager {
}
public int getMaxChannelsPerWorker() {
- return maxChannelsPerWorker;
+ return maxChannelsPerWorker >= 0 ? maxChannelsPerWorker : getDefaultMaxChannelsPerWorker();
}
public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/08695ab3/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
new file mode 100644
index 0000000..0b7b7c3
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationView;
+import org.apache.activemq.broker.jmx.QueueView;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/*
+ demonstrates that with nio it does not make sense to block on the broker but thread pool
+ shold grow past initial corepoolsize of 10
+ */
+public class NIOAsyncSendWithPFCTest extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NIOAsyncSendWithPFCTest.class);
+
+ private static String TRANSPORT_URL = "nio://0.0.0.0:0";
+ private static final String DESTINATION_ONE = "testQ1";
+ private static final String DESTINATION_TWO = "testQ2";
+ private static final int MESSAGES_TO_SEND = 100;
+ private static int NUMBER_OF_PRODUCERS = 10;
+
+ protected BrokerService createBroker() throws Exception {
+
+ BrokerService broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ PolicyMap policyMap = new PolicyMap();
+ List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
+ PolicyEntry pe = new PolicyEntry();
+
+
+ pe.setMemoryLimit(256000);
+ pe.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+
+
+ pe.setQueue(">");
+ entries.add(pe);
+ policyMap.setPolicyEntries(entries);
+ broker.setDestinationPolicy(policyMap);
+
+
+ broker.addConnector(TRANSPORT_URL);
+ broker.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(DESTINATION_ONE)});
+
+ broker.start();
+ TRANSPORT_URL = broker.getTransportConnectorByScheme("nio").getPublishableConnectString();
+ return broker;
+ }
+
+ /**
+ * Test creates 10 producer who send to a single destination using Async mode.
+ * Producer flow control kicks in for that destination. When producer flow control is blocking sends
+ * Test tries to create another JMS connection to the nio.
+ */
+ public void testAsyncSendPFCNewConnection() throws Exception {
+
+
+ BrokerService broker = createBroker();
+ broker.waitUntilStarted();
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
+ QueueView queueView = getQueueView(broker, DESTINATION_ONE);
+
+ try {
+
+ for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
+
+ executorService.submit(new ProducerTask());
+
+ }
+
+ //wait till producer follow control kicks in
+ waitForProducerFlowControl(broker, queueView);
+
+
+ try {
+ sendMessagesAsync(1, DESTINATION_TWO);
+ } catch (Exception ex) {
+ LOG.error("Ex on send new connection", ex);
+ fail("*** received the following exception when creating addition producer new connection:" + ex);
+ }
+
+
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
+ }
+
+
+ public void testAsyncSendPFCExistingConnection() throws Exception {
+
+
+ BrokerService broker = createBroker();
+ broker.waitUntilStarted();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL + "?wireFormat.maxInactivityDuration=5000");
+ ActiveMQConnection exisitngConnection = (ActiveMQConnection) connectionFactory.createConnection();
+
+
+ ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
+ QueueView queueView = getQueueView(broker, DESTINATION_ONE);
+
+ try {
+
+ for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
+
+ executorService.submit(new ProducerTask());
+
+ }
+
+
+ //wait till producer follow control kicks in
+ waitForProducerFlowControl(broker, queueView);
+
+
+ TestSupport.dumpAllThreads("Blocked");
+
+ try {
+ Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ } catch (Exception ex) {
+ LOG.error("Ex on create session", ex);
+ fail("*** received the following exception when creating producer session:" + ex);
+ }
+
+
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
+
+ }
+
+ private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception {
+
+
+ boolean blockingAllSends;
+ do {
+ blockingAllSends = queueView.getBlockedSends() > 10;
+ Thread.sleep(1000);
+
+ } while (!blockingAllSends);
+ }
+
+ class ProducerTask implements Runnable {
+
+
+ @Override
+ public void run() {
+ try {
+ //send X messages
+ sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ private Long sendMessagesAsync(int messageCount, String destination) throws Exception {
+
+ long numberOfMessageSent = 0;
+
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL);
+
+
+ ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+ connection.setUseAsyncSend(true);
+ connection.start();
+
+ try {
+
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer jmsProducer = producerSession.createProducer(producerSession.createQueue(destination));
+
+ Message sendMessage = createTextMessage(producerSession);
+
+ for (int i = 0; i < messageCount; i++) {
+
+ jmsProducer.send(sendMessage);
+ numberOfMessageSent++;
+
+ }
+
+ LOG.info(" Finished after producing : " + numberOfMessageSent);
+ return numberOfMessageSent;
+
+ } catch (Exception ex) {
+ LOG.info("Exception received producing ", ex);
+ LOG.info("finishing after exception :" + numberOfMessageSent);
+ return numberOfMessageSent;
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ }
+
+ private TextMessage createTextMessage(Session session) throws JMSException {
+ StringBuffer buffer = new StringBuffer();
+
+ for (int i = 0; i < 1000; i++) {
+
+ buffer.append("1234567890");
+ }
+
+ return session.createTextMessage(buffer.toString());
+
+ }
+
+
+ private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
+ Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews();
+
+ for (ObjectName key : queueViews.keySet()) {
+ DestinationView destinationView = queueViews.get(key);
+
+ if (destinationView instanceof QueueView) {
+ QueueView queueView = (QueueView) destinationView;
+
+ if (queueView.getName().equals(queueName)) {
+ return queueView;
+ }
+
+ }
+ }
+ return null;
+ }
+
+}
+