You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/23 20:02:46 UTC

svn commit: r558814 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq/ test/java/org/apache/activemq/broker/ test/java/...

Author: chirino
Date: Mon Jul 23 11:02:41 2007
New Revision: 558814

URL: http://svn.apache.org/viewvc?view=rev&rev=558814
Log:
https://issues.apache.org/activemq/browse/AMQ-1337 - Broker should finish accepting connection in an async thread.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Jul 23 11:02:41 2007
@@ -1443,10 +1443,16 @@
      * Returns the broker name if one is available or null if one is not available yet.
      */
     public String getBrokerName() {
-        if (brokerInfo == null) {
-            return null;
-        }
-        return brokerInfo.getBrokerName();
+        try {
+			brokerInfoReceived.await(5,TimeUnit.SECONDS);
+	        if (brokerInfo == null) {
+	            return null;
+	        }
+	        return brokerInfo.getBrokerName();
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			return null;
+		}
     }
    
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Jul 23 11:02:41 2007
@@ -143,10 +143,23 @@
         this.server = server;
         this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
         this.server.setAcceptListener(new TransportAcceptListener() {
-            public void onAccept(Transport transport) {
+            public void onAccept(final Transport transport) {
                 try {
-                    Connection connection = createConnection(transport);
-                    connection.start();
+                	// Starting the connection could block due to 
+                	// wireformat negociation, so start it in an async thread.
+                	Thread startThread = new Thread("ActiveMQ Transport Initiator: "+transport.getRemoteAddress()) {
+                		public void run() {
+                            try {
+								Connection connection = createConnection(transport);
+								connection.start();
+							} catch (Exception e) {
+			                	ServiceSupport.dispose(transport);
+			                    onAcceptError(e);
+							}
+                		}
+                	};
+                	startThread.setPriority(4);
+                	startThread.start();
                 }
                 catch (Exception e) {
                     String remoteHost = transport.getRemoteAddress();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Mon Jul 23 11:02:41 2007
@@ -21,8 +21,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.command.Command;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -52,10 +52,11 @@
     protected boolean marshal;
     protected boolean network;
     protected boolean async=true;
-    protected AtomicBoolean started=new AtomicBoolean();
     protected int asyncQueueDepth=2000;
     protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
     protected LinkedBlockingQueue messageQueue=null;
+    protected boolean started;
+    protected final Object startMutex = new Object();
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
@@ -96,11 +97,15 @@
     }
 
     protected void syncOneWay(Object command){
-        final TransportListener tl=peer.transportListener;
-        prePeerSetQueue=peer.prePeerSetQueue;
-        if(tl==null){
-            prePeerSetQueue.add(command);
-        }else{
+    	TransportListener tl=null;
+    	synchronized(peer.startMutex){
+        	if( peer.started ) {
+                tl = peer.transportListener;
+        	} else if(!peer.disposed) {
+                peer.prePeerSetQueue.add(command);
+        	}
+    	}
+    	if( tl!=null ) {
             tl.onCommand(command);
         }
     }
@@ -147,30 +152,33 @@
     }
 
     public void start() throws Exception{
-        if(started.compareAndSet(false,true)){
-            if(transportListener==null)
-                throw new IOException("TransportListener not set.");
-            if(!async){
-                for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
-                    Command command=(Command)iter.next();
-                    transportListener.onCommand(command);
-                    iter.remove();
-                }
-            }else{
-                peer.wakeup();
-                wakeup();
-            }
+        if(transportListener==null)
+            throw new IOException("TransportListener not set.");
+        synchronized(startMutex) {
+	        if( !prePeerSetQueue.isEmpty() ) {
+	            for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+	                Command command=(Command)iter.next();
+	                transportListener.onCommand(command);
+	            }
+	            prePeerSetQueue.clear();
+	        } 
+	        started = true;
+	        if( isAsync() ) {
+	            peer.wakeup();
+	            wakeup();
+	        }
         }
     }
 
     public void stop() throws Exception{
-        if(started.compareAndSet(true,false)){
+    	synchronized(startMutex) {
             if(!disposed){
+    	        started=false;
                 disposed=true;
-            }
-            if(taskRunner!=null){
-                taskRunner.shutdown(1000);
-                taskRunner=null;
+                if(taskRunner!=null){
+                    taskRunner.shutdown(1000);
+                    taskRunner=null;
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java Mon Jul 23 11:02:41 2007
@@ -36,23 +36,34 @@
 
 public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
     
-    public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
+    private ActiveMQConnection connection;
+	private BrokerService broker;
+
+	public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese");
         assertEquals("Cheese", cf.getClientIDPrefix());
 
-        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
-        try {
-            connection.start();
+        connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
 
-            String clientID = connection.getClientID();
-            log.info("Got client ID: " + clientID);
+        String clientID = connection.getClientID();
+        log.info("Got client ID: " + clientID);
 
-            assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
-        }
-        finally {
-            connection.close();
-        }
+        assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
     }
+	
+	protected void tearDown() throws Exception {
+		// Try our best to close any previously opend connection.
+		try {
+			connection.close();
+		} catch (Throwable ignore) {			
+		}
+		// Try our best to stop any previously started broker.
+		try {
+			broker.stop();
+		} catch (Throwable ignore) {			
+		}
+	}
     
     public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");
@@ -88,26 +99,27 @@
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         // Make sure the broker is not created until the connection is instantiated.
         assertNull( BrokerRegistry.getInstance().lookup("localhost") );        
-        Connection connection = cf.createConnection();
+        connection = (ActiveMQConnection) cf.createConnection();
         // This should create the connection.
         assertNotNull(connection);
         // Verify the broker was created.
         assertNotNull( BrokerRegistry.getInstance().lookup("localhost") );
+        
         connection.close();
+        
         // Verify the broker was destroyed.
         assertNull( BrokerRegistry.getInstance().lookup("localhost") );
     }
     
     public void testGetBrokerName() throws URISyntaxException, JMSException {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-        ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection = (ActiveMQConnection) cf.createConnection();
         connection.start();
         
         String brokerName = connection.getBrokerName();
         log.info("Got broker name: " + brokerName);
         
         assertNotNull("No broker name available!", brokerName);
-        connection.close();
     }
     
     public void testCreateTcpConnectionUsingAllocatedPort() throws Exception {
@@ -143,7 +155,7 @@
 
     protected void assertCreateConnection(String uri) throws Exception {
         // Start up a broker with a tcp connector.
-        BrokerService broker = new BrokerService();
+        broker = new BrokerService();
         broker.setPersistent(false);
         TransportConnector connector = broker.addConnector(uri);
         broker.start();
@@ -162,9 +174,8 @@
         
         // This should create the connection.
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectURI);
-        Connection connection = cf.createConnection();
+        connection = (ActiveMQConnection) cf.createConnection();
         assertNotNull(connection);
-        connection.close();
         
         broker.stop();
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Mon Jul 23 11:02:41 2007
@@ -17,6 +17,9 @@
  */
 package org.apache.activemq.broker;
 
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 
@@ -34,9 +37,6 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-
 public class BrokerTest extends BrokerTestSupport {
     
     public ActiveMQDestination destination;
@@ -45,6 +45,61 @@
     public byte destinationType;
     public boolean durableConsumer;
     
+    public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {    
+        addCombinationValues( "deliveryMode", new Object[]{ 
+                Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
+                Integer.valueOf(DeliveryMode.PERSISTENT)} );
+    }   
+    public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
+        
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.send(consumerInfo1);
+
+        // Setup a second connection 
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);        
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setPrefetchSize(1);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(consumerInfo2);
+        
+        // Send the messages
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        
+        for( int i=0; i < 2 ; i++ ) {
+            Message m1 = receiveMessage(connection1);
+            Message m2 = receiveMessage(connection2);
+            
+            assertNotNull("m1 is null for index: " + i, m1); 
+            assertNotNull("m2 is null for index: " + i, m2);  
+
+            assertNotSame(m1.getMessageId(), m2.getMessageId());
+            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+        }
+        
+        assertNoMessagesLeft(connection1);
+        assertNoMessagesLeft(connection2);
+    }
+
+    
     public void initCombosForTestQueuBrowserWith2Consumers() {    
         addCombinationValues( "deliveryMode", new Object[]{ 
                 Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
@@ -1339,61 +1394,7 @@
         
         assertNoMessagesLeft(connection);
     }
-
      
-    public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {    
-        addCombinationValues( "deliveryMode", new Object[]{ 
-                Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
-                Integer.valueOf(DeliveryMode.PERSISTENT)} );
-    }   
-    public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
-        
-        ActiveMQDestination destination = new ActiveMQQueue("TEST");
-        
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(1);
-        connection1.send(consumerInfo1);
-
-        // Setup a second connection 
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);        
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
-        consumerInfo2.setPrefetchSize(1);
-        connection2.send(connectionInfo2);
-        connection2.send(sessionInfo2);
-        connection2.send(consumerInfo2);
-        
-        // Send the messages
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
-        
-        for( int i=0; i < 2 ; i++ ) {
-            Message m1 = receiveMessage(connection1);
-            Message m2 = receiveMessage(connection2);
-            
-            assertNotNull("m1 is null for index: " + i, m1); 
-            assertNotNull("m2 is null for index: " + i, m2);  
-
-            assertNotSame(m1.getMessageId(), m2.getMessageId());
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-            connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
-        }
-        
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
-    }
     
     public void initCombosForTestQueueSendThenAddConsumer() {    
         addCombinationValues( "deliveryMode", new Object[]{ 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Mon Jul 23 11:02:41 2007
@@ -117,6 +117,14 @@
         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);        
         connection2.send(consumerInfo);
         
+        // Give demand forwarding bridge a chance to finish forwarding the subscriptions.
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+        
+        
         // Send the message to the local boker.
         connection1.request(createMessage(producerInfo, destination, deliveryMode));
         // Make sure the message was delivered via the remote.
@@ -129,14 +137,7 @@
         config.setBrokerName("local");
         config.setDispatchAsync(false);
         bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport()); 
-        bridge.start();
-        
-        // PATCH: Give demand forwarding bridge a chance to finish setting up
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-            ie.printStackTrace();
-        }
+        bridge.start();        
     }
     
     protected void tearDown() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Mon Jul 23 11:02:41 2007
@@ -68,6 +68,13 @@
         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);        
         connection2.send(consumerInfo);
         Thread.sleep(1000);
+        // Give forwarding bridge a chance to finish setting up
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+        
         // Send the message to the local boker.
         connection1.send(createMessage(producerInfo, destination, deliveryMode));
         
@@ -82,14 +89,7 @@
         bridge = new ForwardingBridge(createTransport(), createRemoteTransport());
         bridge.setClientId("local-remote-bridge");
         bridge.setDispatchAsync(false);
-        bridge.start();
-        
-        // PATCH: Give forwarding bridge a chance to finish setting up
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-            ie.printStackTrace();
-        }
+        bridge.start();        
     }
     
     protected void tearDown() throws Exception {