You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/20 07:37:30 UTC

svn commit: r423780 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/test: java/org/apache/activemq/network/ resources/org/apache/activemq/network/

Author: chirino
Date: Wed Jul 19 22:37:29 2006
New Revision: 423780

URL: http://svn.apache.org/viewvc?rev=423780&view=rev
Log:
Adding some network reconnect tests.  These are used to validate the our network connections get re-established after a broker restart.

Added:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java Wed Jul 19 22:37:29 2006
@@ -0,0 +1,314 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * These test cases are used to verifiy that network connections get re established in all broker
+ * restart scenarios.
+ *  
+ * @author chirino
+ */
+public class NetworkReconnectTest extends TestCase {
+
+	private BrokerService producerBroker;
+	private BrokerService consumerBroker;
+	private ActiveMQConnectionFactory producerConnectionFactory;
+	private ActiveMQConnectionFactory consumerConnectionFactory;
+	private Destination destination;
+	private ArrayList connections = new ArrayList();
+	
+	public void testMultipleProducerBrokerRestarts() throws Exception {
+		for (int i = 0; i < 10; i++) {
+			testWithProducerBrokerRestart();
+			disposeConsumerConnections();
+		}
+	}
+	
+	public void testWithoutRestarts() throws Exception {
+		startProducerBroker();
+		startConsumerBroker();
+
+		MessageConsumer consumer = createConsumer();
+		AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+		waitForConsumerToArrive(counter);
+		
+		String messageId = sendMessage();
+		Message message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());
+		
+		assertNull( consumer.receiveNoWait() );
+		
+	}
+
+	public void testWithProducerBrokerRestart() throws Exception {
+		startProducerBroker();
+		startConsumerBroker();
+
+		MessageConsumer consumer = createConsumer();
+		AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+		waitForConsumerToArrive(counter);
+		
+		String messageId = sendMessage();
+		Message message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());		
+		assertNull( consumer.receiveNoWait() );
+		
+		// Restart the first broker...
+		stopProducerBroker();
+		startProducerBroker();
+		
+		counter = createConsumerCounter(producerConnectionFactory);
+		waitForConsumerToArrive(counter);
+		
+		messageId = sendMessage();
+		message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());		
+		assertNull( consumer.receiveNoWait() );
+		
+	}
+
+	public void testWithConsumerBrokerRestart() throws Exception {
+
+		startProducerBroker();
+		startConsumerBroker();
+
+		MessageConsumer consumer = createConsumer();
+		AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+		waitForConsumerToArrive(counter);
+		
+		String messageId = sendMessage();
+		Message message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());		
+		assertNull( consumer.receiveNoWait() );
+		
+		// Restart the first broker...
+		stopConsumerBroker();		
+		waitForConsumerToLeave(counter);		
+		startConsumerBroker();
+		
+		consumer = createConsumer();
+		waitForConsumerToArrive(counter);
+		
+		messageId = sendMessage();
+		message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());		
+		assertNull( consumer.receiveNoWait() );
+		
+	}
+	
+	public void testWithConsumerBrokerStartDelay() throws Exception {
+		
+		startConsumerBroker();
+		MessageConsumer consumer = createConsumer();
+		
+		Thread.sleep(1000*5);
+		
+		startProducerBroker();
+		AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+		waitForConsumerToArrive(counter);
+		
+		String messageId = sendMessage();
+		Message message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());
+		
+		assertNull( consumer.receiveNoWait() );
+
+	}
+
+	
+	public void testWithProducerBrokerStartDelay() throws Exception {
+		
+		startProducerBroker();
+		AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+
+		Thread.sleep(1000*5);
+		
+		startConsumerBroker();
+		MessageConsumer consumer = createConsumer();
+				
+		waitForConsumerToArrive(counter);
+		
+		String messageId = sendMessage();
+		Message message = consumer.receive(1000);
+		
+		assertEquals(messageId, message.getJMSMessageID());
+		
+		assertNull( consumer.receiveNoWait() );
+
+	}
+
+	protected void setUp() throws Exception {
+		producerConnectionFactory = createProducerConnectionFactory();
+		consumerConnectionFactory = createConsumerConnectionFactory();
+		destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
+		
+	}
+	
+	protected void tearDown() throws Exception {
+		disposeConsumerConnections();
+		try {
+			stopProducerBroker();
+		} catch (Throwable e) {
+		}
+		try {
+			stopConsumerBroker();
+		} catch (Throwable e) {
+		}
+	}
+	
+	protected void disposeConsumerConnections() {
+		for (Iterator iter = connections.iterator(); iter.hasNext();) {
+			Connection connection = (Connection) iter.next();
+			try { connection.close(); } catch (Throwable ignore) {}
+		}
+	}
+	
+	protected void startProducerBroker() throws Exception {
+		if( producerBroker==null ) {
+			producerBroker = createFirstBroker();
+			producerBroker.start();
+		}
+	}
+	
+	protected void stopProducerBroker() throws Exception {
+		if( producerBroker!=null ) {
+			producerBroker.stop();
+			producerBroker=null;
+		}
+	}
+	
+	protected void startConsumerBroker() throws Exception {
+		if( consumerBroker==null ) {
+			consumerBroker = createSecondBroker();
+			consumerBroker.start();
+		}
+	}
+	
+	protected void stopConsumerBroker() throws Exception {
+		if( consumerBroker!=null ) {
+			consumerBroker.stop();
+			consumerBroker=null;
+		}
+	}
+	
+	protected BrokerService createFirstBroker() throws Exception {
+		return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+	}
+	
+	protected BrokerService createSecondBroker() throws Exception {
+		return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+	}
+
+	protected ActiveMQConnectionFactory createProducerConnectionFactory() {
+		return new ActiveMQConnectionFactory("vm://broker1");
+	}
+	
+	protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
+		return new ActiveMQConnectionFactory("vm://broker2");
+	}
+	
+	protected String sendMessage() throws JMSException {
+		Connection connection = null;
+		try {
+			connection = producerConnectionFactory.createConnection();
+			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			Message message = session.createMessage();
+			producer.send(message);
+			return message.getJMSMessageID();
+		} finally {
+			try { connection.close(); } catch (Throwable ignore) {}
+		}
+	}
+	
+	protected MessageConsumer createConsumer() throws JMSException {
+		Connection connection = consumerConnectionFactory.createConnection();
+		connections.add(connection);
+		connection.start();
+		
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+		return  session.createConsumer(destination);
+	}
+	
+	protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
+		final AtomicInteger rc = new AtomicInteger(0);
+		Connection connection  = cf.createConnection();
+		connections.add(connection);
+		connection.start();
+		
+		ConsumerEventSource source = new ConsumerEventSource(connection, destination);
+		source.setConsumerListener(new ConsumerListener(){
+			public void onConsumerEvent(ConsumerEvent event) {
+				rc.set(event.getConsumerCount());
+			}
+		});
+		source.start();
+		
+		return rc;
+	}
+	
+	protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException {
+		for( int i=0; i < 100; i++ ) {
+			if( consumerCounter.get() > 0 ) {
+				return;
+			}
+			Thread.sleep(50);
+		}
+		fail("The consumer did not arrive.");
+	}
+	
+	protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException {
+		for( int i=0; i < 100; i++ ) {
+			if( consumerCounter.get() == 0 ) {
+				return;
+			}
+			Thread.sleep(50);
+		}
+		fail("The consumer did not leave.");
+	}
+
+}

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java Wed Jul 19 22:37:29 2006
@@ -0,0 +1,90 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+/**
+ * Test network reconnects over SSH tunnels.  This case can be especially tricky since the SSH tunnels
+ * fool the TCP transport into thinking that they are initially connected.
+ *  
+ * @author chirino
+ */
+public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest {
+
+	ArrayList processes = new ArrayList();
+	
+	
+	protected BrokerService createFirstBroker() throws Exception {
+		return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml"));
+	}
+	
+	protected BrokerService createSecondBroker() throws Exception {
+		return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml"));
+	}
+	
+	protected void setUp() throws Exception {		
+		startProcess("ssh -Nn -L60006:localhost:61616 localhost");
+		startProcess("ssh -Nn -L60007:localhost:61617 localhost");		
+		super.setUp();
+	}
+	
+	protected void tearDown() throws Exception {		
+		super.tearDown();
+		for (Iterator iter = processes.iterator(); iter.hasNext();) {
+			Process p = (Process) iter.next();
+			p.destroy();
+		}
+	}
+
+	private void startProcess(String command) throws IOException {
+		final Process process = Runtime.getRuntime().exec(command);
+		processes.add(process);
+		new Thread("stdout: "+command){
+			public void run() {
+				try {
+					InputStream is = process.getInputStream();
+					int c;
+					while((c=is.read())>=0) {
+						System.out.write(c);
+					}
+				} catch (IOException e) {
+				}
+			}
+		}.start();
+		new Thread("stderr: "+command){
+			public void run() {
+				try {
+					InputStream is = process.getErrorStream();
+					int c;
+					while((c=is.read())>=0) {
+						System.err.write(c);
+					}
+				} catch (IOException e) {
+				}
+			}
+		}.start();
+	}
+}

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+      <transportConnector uri="vm://broker1"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61617)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+</beans>
+

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61617"/>
+      <transportConnector uri="vm://broker2"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61616)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+
+</beans>
+

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+      <transportConnector uri="vm://broker1"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:60007)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+</beans>
+

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Copyright 2005-2006 The Apache Software Foundation
+   
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61617"/>
+      <transportConnector uri="vm://broker2"/>
+    </transportConnectors>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:60006)"/>
+    </networkConnectors>
+    
+  </broker>
+  
+
+</beans>
+