You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/20 07:37:30 UTC
svn commit: r423780 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src/test:
java/org/apache/activemq/network/ resources/org/apache/activemq/network/
Author: chirino
Date: Wed Jul 19 22:37:29 2006
New Revision: 423780
URL: http://svn.apache.org/viewvc?rev=423780&view=rev
Log:
Adding some network reconnect tests. These are used to validate the our network connections get re-established after a broker restart.
Added:
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java Wed Jul 19 22:37:29 2006
@@ -0,0 +1,314 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * These test cases are used to verifiy that network connections get re established in all broker
+ * restart scenarios.
+ *
+ * @author chirino
+ */
+public class NetworkReconnectTest extends TestCase {
+
+ private BrokerService producerBroker;
+ private BrokerService consumerBroker;
+ private ActiveMQConnectionFactory producerConnectionFactory;
+ private ActiveMQConnectionFactory consumerConnectionFactory;
+ private Destination destination;
+ private ArrayList connections = new ArrayList();
+
+ public void testMultipleProducerBrokerRestarts() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ testWithProducerBrokerRestart();
+ disposeConsumerConnections();
+ }
+ }
+
+ public void testWithoutRestarts() throws Exception {
+ startProducerBroker();
+ startConsumerBroker();
+
+ MessageConsumer consumer = createConsumer();
+ AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+ waitForConsumerToArrive(counter);
+
+ String messageId = sendMessage();
+ Message message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+
+ assertNull( consumer.receiveNoWait() );
+
+ }
+
+ public void testWithProducerBrokerRestart() throws Exception {
+ startProducerBroker();
+ startConsumerBroker();
+
+ MessageConsumer consumer = createConsumer();
+ AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+ waitForConsumerToArrive(counter);
+
+ String messageId = sendMessage();
+ Message message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+ assertNull( consumer.receiveNoWait() );
+
+ // Restart the first broker...
+ stopProducerBroker();
+ startProducerBroker();
+
+ counter = createConsumerCounter(producerConnectionFactory);
+ waitForConsumerToArrive(counter);
+
+ messageId = sendMessage();
+ message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+ assertNull( consumer.receiveNoWait() );
+
+ }
+
+ public void testWithConsumerBrokerRestart() throws Exception {
+
+ startProducerBroker();
+ startConsumerBroker();
+
+ MessageConsumer consumer = createConsumer();
+ AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+ waitForConsumerToArrive(counter);
+
+ String messageId = sendMessage();
+ Message message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+ assertNull( consumer.receiveNoWait() );
+
+ // Restart the first broker...
+ stopConsumerBroker();
+ waitForConsumerToLeave(counter);
+ startConsumerBroker();
+
+ consumer = createConsumer();
+ waitForConsumerToArrive(counter);
+
+ messageId = sendMessage();
+ message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+ assertNull( consumer.receiveNoWait() );
+
+ }
+
+ public void testWithConsumerBrokerStartDelay() throws Exception {
+
+ startConsumerBroker();
+ MessageConsumer consumer = createConsumer();
+
+ Thread.sleep(1000*5);
+
+ startProducerBroker();
+ AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+ waitForConsumerToArrive(counter);
+
+ String messageId = sendMessage();
+ Message message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+
+ assertNull( consumer.receiveNoWait() );
+
+ }
+
+
+ public void testWithProducerBrokerStartDelay() throws Exception {
+
+ startProducerBroker();
+ AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
+
+ Thread.sleep(1000*5);
+
+ startConsumerBroker();
+ MessageConsumer consumer = createConsumer();
+
+ waitForConsumerToArrive(counter);
+
+ String messageId = sendMessage();
+ Message message = consumer.receive(1000);
+
+ assertEquals(messageId, message.getJMSMessageID());
+
+ assertNull( consumer.receiveNoWait() );
+
+ }
+
+ protected void setUp() throws Exception {
+ producerConnectionFactory = createProducerConnectionFactory();
+ consumerConnectionFactory = createConsumerConnectionFactory();
+ destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
+
+ }
+
+ protected void tearDown() throws Exception {
+ disposeConsumerConnections();
+ try {
+ stopProducerBroker();
+ } catch (Throwable e) {
+ }
+ try {
+ stopConsumerBroker();
+ } catch (Throwable e) {
+ }
+ }
+
+ protected void disposeConsumerConnections() {
+ for (Iterator iter = connections.iterator(); iter.hasNext();) {
+ Connection connection = (Connection) iter.next();
+ try { connection.close(); } catch (Throwable ignore) {}
+ }
+ }
+
+ protected void startProducerBroker() throws Exception {
+ if( producerBroker==null ) {
+ producerBroker = createFirstBroker();
+ producerBroker.start();
+ }
+ }
+
+ protected void stopProducerBroker() throws Exception {
+ if( producerBroker!=null ) {
+ producerBroker.stop();
+ producerBroker=null;
+ }
+ }
+
+ protected void startConsumerBroker() throws Exception {
+ if( consumerBroker==null ) {
+ consumerBroker = createSecondBroker();
+ consumerBroker.start();
+ }
+ }
+
+ protected void stopConsumerBroker() throws Exception {
+ if( consumerBroker!=null ) {
+ consumerBroker.stop();
+ consumerBroker=null;
+ }
+ }
+
+ protected BrokerService createFirstBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+ }
+
+ protected BrokerService createSecondBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+ }
+
+ protected ActiveMQConnectionFactory createProducerConnectionFactory() {
+ return new ActiveMQConnectionFactory("vm://broker1");
+ }
+
+ protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
+ return new ActiveMQConnectionFactory("vm://broker2");
+ }
+
+ protected String sendMessage() throws JMSException {
+ Connection connection = null;
+ try {
+ connection = producerConnectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createMessage();
+ producer.send(message);
+ return message.getJMSMessageID();
+ } finally {
+ try { connection.close(); } catch (Throwable ignore) {}
+ }
+ }
+
+ protected MessageConsumer createConsumer() throws JMSException {
+ Connection connection = consumerConnectionFactory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return session.createConsumer(destination);
+ }
+
+ protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
+ final AtomicInteger rc = new AtomicInteger(0);
+ Connection connection = cf.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ ConsumerEventSource source = new ConsumerEventSource(connection, destination);
+ source.setConsumerListener(new ConsumerListener(){
+ public void onConsumerEvent(ConsumerEvent event) {
+ rc.set(event.getConsumerCount());
+ }
+ });
+ source.start();
+
+ return rc;
+ }
+
+ protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException {
+ for( int i=0; i < 100; i++ ) {
+ if( consumerCounter.get() > 0 ) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ fail("The consumer did not arrive.");
+ }
+
+ protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException {
+ for( int i=0; i < 100; i++ ) {
+ if( consumerCounter.get() == 0 ) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ fail("The consumer did not leave.");
+ }
+
+}
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java Wed Jul 19 22:37:29 2006
@@ -0,0 +1,90 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+/**
+ * Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels
+ * fool the TCP transport into thinking that they are initially connected.
+ *
+ * @author chirino
+ */
+public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest {
+
+ ArrayList processes = new ArrayList();
+
+
+ protected BrokerService createFirstBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml"));
+ }
+
+ protected BrokerService createSecondBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml"));
+ }
+
+ protected void setUp() throws Exception {
+ startProcess("ssh -Nn -L60006:localhost:61616 localhost");
+ startProcess("ssh -Nn -L60007:localhost:61617 localhost");
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ for (Iterator iter = processes.iterator(); iter.hasNext();) {
+ Process p = (Process) iter.next();
+ p.destroy();
+ }
+ }
+
+ private void startProcess(String command) throws IOException {
+ final Process process = Runtime.getRuntime().exec(command);
+ processes.add(process);
+ new Thread("stdout: "+command){
+ public void run() {
+ try {
+ InputStream is = process.getInputStream();
+ int c;
+ while((c=is.read())>=0) {
+ System.out.write(c);
+ }
+ } catch (IOException e) {
+ }
+ }
+ }.start();
+ new Thread("stderr: "+command){
+ public void run() {
+ try {
+ InputStream is = process.getErrorStream();
+ int c;
+ while((c=is.read())>=0) {
+ System.err.write(c);
+ }
+ } catch (IOException e) {
+ }
+ }
+ }.start();
+ }
+}
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ <transportConnector uri="vm://broker1"/>
+ </transportConnectors>
+
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:61617)"/>
+ </networkConnectors>
+
+ </broker>
+
+</beans>
+
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61617"/>
+ <transportConnector uri="vm://broker2"/>
+ </transportConnectors>
+
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:61616)"/>
+ </networkConnectors>
+
+ </broker>
+
+
+</beans>
+
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ <transportConnector uri="vm://broker1"/>
+ </transportConnectors>
+
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:60007)"/>
+ </networkConnectors>
+
+ </broker>
+
+</beans>
+
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml?rev=423780&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml Wed Jul 19 22:37:29 2006
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://activemq.org/config/1.0">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61617"/>
+ <transportConnector uri="vm://broker2"/>
+ </transportConnectors>
+
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:60006)"/>
+ </networkConnectors>
+
+ </broker>
+
+
+</beans>
+