You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/06/03 11:24:07 UTC

svn commit: r781312 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/ test/java/org/apache/activemq/broker/

Author: gtully
Date: Wed Jun  3 09:24:05 2009
New Revision: 781312

URL: http://svn.apache.org/viewvc?rev=781312&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2277 - patch applied with thanks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorSelection.java Wed Jun  3 09:24:05 2009
@@ -35,8 +35,16 @@
     public SelectorSelection(SelectorWorker worker, SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
         this.worker = worker;
         this.listener = listener;
-        this.key = socketChannel.register(worker.selector, 0, this);
-        worker.incrementUseCounter();
+        
+        // Lock when mutating state of the selector
+        worker.lock();
+        
+        try {
+            this.key = socketChannel.register(worker.selector, 0, this);
+            worker.incrementUseCounter();
+        } finally {
+            worker.unlock();
+        }
     }
 
     public void setInterestOps(int ops) {
@@ -56,8 +64,14 @@
 
     public void close() {
         worker.decrementUseCounter();
-        key.cancel();
-        worker.selector.wakeup();
+    	
+        // Lock when mutating state of the selector
+        worker.lock();
+        try {
+            key.cancel();
+        } finally {
+            worker.unlock();
+        }
     }
 
     public void onSelect() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorWorker.java Wed Jun  3 09:24:05 2009
@@ -21,7 +21,10 @@
 import java.nio.channels.Selector;
 import java.util.Iterator;
 import java.util.Set;
+
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SelectorWorker implements Runnable {
 
@@ -32,7 +35,8 @@
     final int id = NEXT_ID.getAndIncrement();
     final AtomicInteger useCounter = new AtomicInteger();
     private final int maxChannelsPerWorker;
-
+    private final ReadWriteLock selectorLock = new ReentrantReadWriteLock();
+       
     public SelectorWorker(SelectorManager manager) throws IOException {
         this.manager = manager;
         selector = Selector.open();
@@ -67,7 +71,8 @@
         try {
             Thread.currentThread().setName("Selector Worker: " + id);
             while (isRunning()) {
-
+                
+                lockBarrier();       	
                 int count = selector.select(10);
                 if (count == 0) {
                     continue;
@@ -127,4 +132,19 @@
             Thread.currentThread().setName(origName);
         }
     }
+
+    private void lockBarrier() {
+        selectorLock.writeLock().lock();
+        selectorLock.writeLock().unlock();
+	}
+
+    public void lock() {
+        selectorLock.readLock().lock();
+        selector.wakeup();
+    }
+
+	public void unlock() {
+	    selectorLock.readLock().unlock();
+	}
+	
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java?rev=781312&r1=781311&r2=781312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java Wed Jun  3 09:24:05 2009
@@ -18,25 +18,50 @@
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import edu.emory.mathcs.backport.java.util.Collections;
 
-public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
+
+@SuppressWarnings("unchecked")
+public class NioQueueSubscriptionTest extends QueueSubscriptionTest implements ExceptionListener {
+    
+    protected static final Log LOG = LogFactory.getLog(NioQueueSubscriptionTest.class);
+    
+    private Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
     
     @Override
     protected ConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("tcp://localhost:62621");
+        return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
+    }
+    
+    protected void setUp() throws Exception {
+        //setMaxTestTime(20*60*1000);
+        super.setUp();
     }
     
     @Override
     protected BrokerService createBroker() throws Exception {
-        BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?persistent=false&useJmx=true"));
+        BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
+        answer.getManagementContext().setCreateConnector(false);
+        answer.setUseJmx(false);
+        answer.setDeleteAllMessagesOnStartup(true);
         final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
         final PolicyEntry entry = new PolicyEntry();
         entry.setQueue(">");
@@ -48,4 +73,40 @@
         answer.setDestinationPolicy(policyMap);
         return answer;
     }
+    
+    public void testLotsOfConcurrentConnections() throws Exception {
+        ExecutorService executor = Executors.newCachedThreadPool(); 
+        final ConnectionFactory factory = createConnectionFactory();
+        final ExceptionListener listener = this;
+        int connectionCount = 400;
+        for (int i=0;i<connectionCount ;i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+                        connection.setExceptionListener(listener);
+                        connection.start();
+                        assertNotNull(connection.getBrokerName());
+                        connections.add(connection);
+                    } catch (Exception e) {
+                        exceptions.put(Thread.currentThread(), e);
+                    }
+                }
+            });
+        }
+        
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+
+        if (!exceptions.isEmpty()) {
+          LOG.error("" + exceptions.size() + " exceptions like", exceptions.values().iterator().next());
+          fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next());
+        }
+        LOG.info("created " + connectionCount + " connecitons");
+    }
+
+    public void onException(JMSException exception) {
+        LOG.error("Exception on conneciton", exception);
+        exceptions.put(Thread.currentThread(), exception);
+    }
 }