You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/23 20:02:46 UTC
svn commit: r558814 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq/
test/java/org/apache/activemq/broker/ test/java/...
Author: chirino
Date: Mon Jul 23 11:02:41 2007
New Revision: 558814
URL: http://svn.apache.org/viewvc?view=rev&rev=558814
Log:
https://issues.apache.org/activemq/browse/AMQ-1337 - Broker should finish accepting connection in an async thread.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Jul 23 11:02:41 2007
@@ -1443,10 +1443,16 @@
* Returns the broker name if one is available or null if one is not available yet.
*/
public String getBrokerName() {
- if (brokerInfo == null) {
- return null;
- }
- return brokerInfo.getBrokerName();
+ try {
+ brokerInfoReceived.await(5,TimeUnit.SECONDS);
+ if (brokerInfo == null) {
+ return null;
+ }
+ return brokerInfo.getBrokerName();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Jul 23 11:02:41 2007
@@ -143,10 +143,23 @@
this.server = server;
this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
this.server.setAcceptListener(new TransportAcceptListener() {
- public void onAccept(Transport transport) {
+ public void onAccept(final Transport transport) {
try {
- Connection connection = createConnection(transport);
- connection.start();
+ // Starting the connection could block due to
+ // wireformat negociation, so start it in an async thread.
+ Thread startThread = new Thread("ActiveMQ Transport Initiator: "+transport.getRemoteAddress()) {
+ public void run() {
+ try {
+ Connection connection = createConnection(transport);
+ connection.start();
+ } catch (Exception e) {
+ ServiceSupport.dispose(transport);
+ onAcceptError(e);
+ }
+ }
+ };
+ startThread.setPriority(4);
+ startThread.start();
}
catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Mon Jul 23 11:02:41 2007
@@ -21,8 +21,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@@ -52,10 +52,11 @@
protected boolean marshal;
protected boolean network;
protected boolean async=true;
- protected AtomicBoolean started=new AtomicBoolean();
protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null;
+ protected boolean started;
+ protected final Object startMutex = new Object();
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
@@ -96,11 +97,15 @@
}
protected void syncOneWay(Object command){
- final TransportListener tl=peer.transportListener;
- prePeerSetQueue=peer.prePeerSetQueue;
- if(tl==null){
- prePeerSetQueue.add(command);
- }else{
+ TransportListener tl=null;
+ synchronized(peer.startMutex){
+ if( peer.started ) {
+ tl = peer.transportListener;
+ } else if(!peer.disposed) {
+ peer.prePeerSetQueue.add(command);
+ }
+ }
+ if( tl!=null ) {
tl.onCommand(command);
}
}
@@ -147,30 +152,33 @@
}
public void start() throws Exception{
- if(started.compareAndSet(false,true)){
- if(transportListener==null)
- throw new IOException("TransportListener not set.");
- if(!async){
- for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
- Command command=(Command)iter.next();
- transportListener.onCommand(command);
- iter.remove();
- }
- }else{
- peer.wakeup();
- wakeup();
- }
+ if(transportListener==null)
+ throw new IOException("TransportListener not set.");
+ synchronized(startMutex) {
+ if( !prePeerSetQueue.isEmpty() ) {
+ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+ Command command=(Command)iter.next();
+ transportListener.onCommand(command);
+ }
+ prePeerSetQueue.clear();
+ }
+ started = true;
+ if( isAsync() ) {
+ peer.wakeup();
+ wakeup();
+ }
}
}
public void stop() throws Exception{
- if(started.compareAndSet(true,false)){
+ synchronized(startMutex) {
if(!disposed){
+ started=false;
disposed=true;
- }
- if(taskRunner!=null){
- taskRunner.shutdown(1000);
- taskRunner=null;
+ if(taskRunner!=null){
+ taskRunner.shutdown(1000);
+ taskRunner=null;
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java Mon Jul 23 11:02:41 2007
@@ -36,23 +36,34 @@
public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
- public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
+ private ActiveMQConnection connection;
+ private BrokerService broker;
+
+ public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.clientIDPrefix=Cheese");
assertEquals("Cheese", cf.getClientIDPrefix());
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- try {
- connection.start();
+ connection = (ActiveMQConnection) cf.createConnection();
+ connection.start();
- String clientID = connection.getClientID();
- log.info("Got client ID: " + clientID);
+ String clientID = connection.getClientID();
+ log.info("Got client ID: " + clientID);
- assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
- }
- finally {
- connection.close();
- }
+ assertTrue("should start with Cheese! but was: " + clientID, clientID.startsWith("Cheese"));
}
+
+ protected void tearDown() throws Exception {
+ // Try our best to close any previously opend connection.
+ try {
+ connection.close();
+ } catch (Throwable ignore) {
+ }
+ // Try our best to stop any previously started broker.
+ try {
+ broker.stop();
+ } catch (Throwable ignore) {
+ }
+ }
public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");
@@ -88,26 +99,27 @@
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
// Make sure the broker is not created until the connection is instantiated.
assertNull( BrokerRegistry.getInstance().lookup("localhost") );
- Connection connection = cf.createConnection();
+ connection = (ActiveMQConnection) cf.createConnection();
// This should create the connection.
assertNotNull(connection);
// Verify the broker was created.
assertNotNull( BrokerRegistry.getInstance().lookup("localhost") );
+
connection.close();
+
// Verify the broker was destroyed.
assertNull( BrokerRegistry.getInstance().lookup("localhost") );
}
public void testGetBrokerName() throws URISyntaxException, JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+ connection = (ActiveMQConnection) cf.createConnection();
connection.start();
String brokerName = connection.getBrokerName();
log.info("Got broker name: " + brokerName);
assertNotNull("No broker name available!", brokerName);
- connection.close();
}
public void testCreateTcpConnectionUsingAllocatedPort() throws Exception {
@@ -143,7 +155,7 @@
protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector.
- BrokerService broker = new BrokerService();
+ broker = new BrokerService();
broker.setPersistent(false);
TransportConnector connector = broker.addConnector(uri);
broker.start();
@@ -162,9 +174,8 @@
// This should create the connection.
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectURI);
- Connection connection = cf.createConnection();
+ connection = (ActiveMQConnection) cf.createConnection();
assertNotNull(connection);
- connection.close();
broker.stop();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Mon Jul 23 11:02:41 2007
@@ -17,6 +17,9 @@
*/
package org.apache.activemq.broker;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@@ -34,9 +37,6 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-
public class BrokerTest extends BrokerTestSupport {
public ActiveMQDestination destination;
@@ -45,6 +45,61 @@
public byte destinationType;
public boolean durableConsumer;
+ public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
+ addCombinationValues( "deliveryMode", new Object[]{
+ Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)} );
+ }
+ public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.send(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setPrefetchSize(1);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(consumerInfo2);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+
+ for( int i=0; i < 2 ; i++ ) {
+ Message m1 = receiveMessage(connection1);
+ Message m2 = receiveMessage(connection2);
+
+ assertNotNull("m1 is null for index: " + i, m1);
+ assertNotNull("m2 is null for index: " + i, m2);
+
+ assertNotSame(m1.getMessageId(), m2.getMessageId());
+ connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+ connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+
public void initCombosForTestQueuBrowserWith2Consumers() {
addCombinationValues( "deliveryMode", new Object[]{
Integer.valueOf(DeliveryMode.NON_PERSISTENT),
@@ -1339,61 +1394,7 @@
assertNoMessagesLeft(connection);
}
-
- public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
- addCombinationValues( "deliveryMode", new Object[]{
- Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)} );
- }
- public void testQueueOnlyOnceDeliveryWith2Consumers() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQQueue("TEST");
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(1);
- connection1.send(consumerInfo1);
-
- // Setup a second connection
- StubConnection connection2 = createConnection();
- ConnectionInfo connectionInfo2 = createConnectionInfo();
- SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
- ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
- consumerInfo2.setPrefetchSize(1);
- connection2.send(connectionInfo2);
- connection2.send(sessionInfo2);
- connection2.send(consumerInfo2);
-
- // Send the messages
- connection1.send(createMessage(producerInfo, destination, deliveryMode));
- connection1.send(createMessage(producerInfo, destination, deliveryMode));
- connection1.send(createMessage(producerInfo, destination, deliveryMode));
- connection1.send(createMessage(producerInfo, destination, deliveryMode));
-
- for( int i=0; i < 2 ; i++ ) {
- Message m1 = receiveMessage(connection1);
- Message m2 = receiveMessage(connection2);
-
- assertNotNull("m1 is null for index: " + i, m1);
- assertNotNull("m2 is null for index: " + i, m2);
-
- assertNotSame(m1.getMessageId(), m2.getMessageId());
- connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
- connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
- }
-
- assertNoMessagesLeft(connection1);
- assertNoMessagesLeft(connection2);
- }
public void initCombosForTestQueueSendThenAddConsumer() {
addCombinationValues( "deliveryMode", new Object[]{
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java Mon Jul 23 11:02:41 2007
@@ -117,6 +117,14 @@
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo);
+ // Give demand forwarding bridge a chance to finish forwarding the subscriptions.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ }
+
+
// Send the message to the local boker.
connection1.request(createMessage(producerInfo, destination, deliveryMode));
// Make sure the message was delivered via the remote.
@@ -129,14 +137,7 @@
config.setBrokerName("local");
config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport());
- bridge.start();
-
- // PATCH: Give demand forwarding bridge a chance to finish setting up
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
+ bridge.start();
}
protected void tearDown() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java?view=diff&rev=558814&r1=558813&r2=558814
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Mon Jul 23 11:02:41 2007
@@ -68,6 +68,13 @@
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo);
Thread.sleep(1000);
+ // Give forwarding bridge a chance to finish setting up
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ }
+
// Send the message to the local boker.
connection1.send(createMessage(producerInfo, destination, deliveryMode));
@@ -82,14 +89,7 @@
bridge = new ForwardingBridge(createTransport(), createRemoteTransport());
bridge.setClientId("local-remote-bridge");
bridge.setDispatchAsync(false);
- bridge.start();
-
- // PATCH: Give forwarding bridge a chance to finish setting up
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- }
+ bridge.start();
}
protected void tearDown() throws Exception {