You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/06/03 11:24:07 UTC
svn commit: r781312 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/nio/
test/java/org/apache/activemq/broker/
Author: gtully
Date: Wed Jun 3 09:24:05 2009
New Revision: 781312
URL: http://svn.apache.org/viewvc?rev=781312&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2277 - patch applied with thanks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Wed Jun 3 09:24:05 2009
@@ -35,8 +35,16 @@
public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
this.worker = worker;
this.listener = listener;
- this.key = socketChannel.register(worker.selector, 0, this);
- worker.incrementUseCounter();
+
+ // Lock when mutating state of the selector
+ worker.lock();
+
+ try {
+ this.key = socketChannel.register(worker.selector, 0, this);
+ worker.incrementUseCounter();
+ } finally {
+ worker.unlock();
+ }
}
public void setInterestOps(int ops) {
@@ -56,8 +64,14 @@
public void close() {
worker.decrementUseCounter();
- key.cancel();
- worker.selector.wakeup();
+
+ // Lock when mutating state of the selector
+ worker.lock();
+ try {
+ key.cancel();
+ } finally {
+ worker.unlock();
+ }
}
public void onSelect() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Wed Jun 3 09:24:05 2009
@@ -21,7 +21,10 @@
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
+
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SelectorWorker implements Runnable {
@@ -32,7 +35,8 @@
final int id = NEXT_ID.getAndIncrement();
final AtomicInteger useCounter = new AtomicInteger();
private final int maxChannelsPerWorker;
-
+ private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+
public SelectorWorker(SelectorManager manager) throws IOException {
this.manager = manager;
selector = Selector.open();
@@ -67,7 +71,8 @@
try {
Thread.currentThread().setName("Selector Worker: " + id);
while (isRunning()) {
-
+
+ lockBarrier();
int count = selector.select(10);
if (count == 0) {
continue;
@@ -127,4 +132,19 @@
Thread.currentThread().setName(origName);
}
}
+
+ private void lockBarrier() {
+ selectorLock.writeLock().lock();
+ selectorLock.writeLock().unlock();
+ }
+
+ public void lock() {
+ selectorLock.readLock().lock();
+ selector.wakeup();
+ }
+
+ public void unlock() {
+ selectorLock.readLock().unlock();
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java Wed Jun 3 09:24:05 2009
@@ -18,25 +18,50 @@
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.Collections;
-public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
+
+@SuppressWarnings("unchecked")
+public class NioQueueSubscriptionTest extends QueueSubscriptionTest implements ExceptionListener {
+
+ protected static final Log LOG = LogFactory.getLog(NioQueueSubscriptionTest.class);
+
+ private Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("tcp://localhost:62621");
+ return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
+ }
+
+ protected void setUp() throws Exception {
+ //setMaxTestTime(20*60*1000);
+ super.setUp();
}
@Override
protected BrokerService createBroker() throws Exception {
- BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?persistent=false&useJmx=true"));
+ BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
+ answer.getManagementContext().setCreateConnector(false);
+ answer.setUseJmx(false);
+ answer.setDeleteAllMessagesOnStartup(true);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
@@ -48,4 +73,40 @@
answer.setDestinationPolicy(policyMap);
return answer;
}
+
+ public void testLotsOfConcurrentConnections() throws Exception {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final ConnectionFactory factory = createConnectionFactory();
+ final ExceptionListener listener = this;
+ int connectionCount = 400;
+ for (int i=0;i<connectionCount ;i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+ connection.setExceptionListener(listener);
+ connection.start();
+ assertNotNull(connection.getBrokerName());
+ connections.add(connection);
+ } catch (Exception e) {
+ exceptions.put(Thread.currentThread(), e);
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ if (!exceptions.isEmpty()) {
+ LOG.error("" + exceptions.size() + " exceptions like", exceptions.values().iterator().next());
+ fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next());
+ }
+ LOG.info("created " + connectionCount + " connecitons");
+ }
+
+ public void onException(JMSException exception) {
+ LOG.error("Exception on conneciton", exception);
+ exceptions.put(Thread.currentThread(), exception);
+ }
}