You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/06/02 11:05:40 UTC
svn commit: r1130441 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/NetworkConnector.java
test/java/org/apache/activemq/network/NetworkConnectionsTest.java
Author: dejanb
Date: Thu Jun 2 09:05:39 2011
New Revision: 1130441
URL: http://svn.apache.org/viewvc?rev=1130441&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3348 - network connector restart
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=1130441&r1=1130440&r2=1130441&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Thu Jun 2 09:05:39 2011
@@ -221,6 +221,14 @@ public abstract class NetworkConnector e
return serviceSupport.isStarted();
}
+ public boolean isStopped() {
+ return serviceSupport.isStopped();
+ }
+
+ public boolean isStopping() {
+ return serviceSupport.isStopping();
+ }
+
public ObjectName getObjectName() {
return objectName;
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java?rev=1130441&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java Thu Jun 2 09:05:39 2011
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+
+public class NetworkConnectionsTest extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(NetworkConnectionsTest.class);
+
+ private static final String LOCAL_BROKER_TRANSPORT_URI = "tcp://localhost:61616";
+ private static final String REMOTE_BROKER_TRANSPORT_URI = "tcp://localhost:61617";
+ private static final String DESTINATION_NAME = "TEST.RECONNECT";
+
+ private BrokerService localBroker;
+ private BrokerService remoteBroker;
+
+ @Test
+ public void testIsStarted() throws Exception {
+ LOG.info("testIsStarted is starting...");
+
+ LOG.info("Adding network connector...");
+ NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")");
+ nc.setName("NC1");
+
+ LOG.info("Starting network connector...");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Stopping network connector...");
+ nc.stop();
+
+ while (nc.isStopping()) {
+ LOG.info("... still stopping ...");
+ Thread.sleep(100);
+ }
+
+ assertTrue(nc.isStopped());
+ assertFalse(nc.isStarted());
+
+ LOG.info("Starting network connector...");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Stopping network connector...");
+ nc.stop();
+
+ while (nc.isStopping()) {
+ LOG.info("... still stopping ...");
+ Thread.sleep(100);
+ }
+
+ assertTrue(nc.isStopped());
+ assertFalse(nc.isStarted());
+ }
+
+ @Test
+ public void testNetworkConnectionRestart() throws Exception {
+ LOG.info("testNetworkConnectionRestart is starting...");
+
+ LOG.info("Adding network connector...");
+ NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")");
+ nc.setName("NC1");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Setting up Message Producer and Consumer");
+ ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME);
+
+ ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI);
+ Connection localConnection = localFactory.createConnection();
+ localConnection.start();
+ Session localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer localProducer = localSession.createProducer(destination);
+
+ ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI);
+ Connection remoteConnection = remoteFactory.createConnection();
+ remoteConnection.start();
+ Session remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer remoteConsumer = remoteSession.createConsumer(destination);
+
+ Message message = localSession.createTextMessage("test");
+ localProducer.send(message);
+
+ LOG.info("Testing initial network connection...");
+ message = remoteConsumer.receive(10000);
+ assertNotNull(message);
+
+ LOG.info("Stopping network connection...");
+ nc.stop();
+ assertFalse(nc.isStarted());
+
+ LOG.info("Sending 2nd message...");
+ message = localSession.createTextMessage("test stop");
+ localProducer.send(message);
+
+ message = remoteConsumer.receive(1000);
+ assertNull("Message should not have been delivered since NetworkConnector was stopped", message);
+
+ LOG.info("(Re)starting network connection...");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Wait for 2nd message to get forwarded and received...");
+ message = remoteConsumer.receive(10000);
+ assertNotNull("Should have received 2nd message", message);
+ }
+
+ @Test
+ public void testNetworkConnectionReAddURI() throws Exception {
+ LOG.info("testNetworkConnectionReAddURI is starting...");
+
+ LOG.info("Adding network connector 'NC1'...");
+ NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")");
+ nc.setName("NC1");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Looking up network connector by name...");
+ NetworkConnector nc1 = localBroker.getNetworkConnectorByName("NC1");
+ assertNotNull("Should find network connector 'NC1'", nc1);
+ assertTrue(nc1.isStarted());
+ assertEquals(nc, nc1);
+
+ LOG.info("Setting up producer and consumer...");
+ ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME);
+
+ ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI);
+ Connection localConnection = localFactory.createConnection();
+ localConnection.start();
+ Session localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer localProducer = localSession.createProducer(destination);
+
+ ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI);
+ Connection remoteConnection = remoteFactory.createConnection();
+ remoteConnection.start();
+ Session remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer remoteConsumer = remoteSession.createConsumer(destination);
+
+ Message message = localSession.createTextMessage("test");
+ localProducer.send(message);
+
+ LOG.info("Testing initial network connection...");
+ message = remoteConsumer.receive(10000);
+ assertNotNull(message);
+
+ LOG.info("Stopping network connector 'NC1'...");
+ nc.stop();
+ assertFalse(nc.isStarted());
+
+ LOG.info("Removing network connector...");
+ assertTrue(localBroker.removeNetworkConnector(nc));
+
+ nc1 = localBroker.getNetworkConnectorByName("NC1");
+ assertNull("Should not find network connector 'NC1'", nc1);
+
+ LOG.info("Re-adding network connector 'NC2'...");
+ nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")");
+ nc.setName("NC2");
+ nc.start();
+ assertTrue(nc.isStarted());
+
+ LOG.info("Looking up network connector by name...");
+ NetworkConnector nc2 = localBroker.getNetworkConnectorByName("NC2");
+ assertNotNull(nc2);
+ assertTrue(nc2.isStarted());
+ assertEquals(nc, nc2);
+
+ LOG.info("Testing re-added network connection...");
+ message = localSession.createTextMessage("test");
+ localProducer.send(message);
+
+ message = remoteConsumer.receive(10000);
+ assertNotNull(message);
+
+ LOG.info("Stopping network connector...");
+ nc.stop();
+ assertFalse(nc.isStarted());
+
+ LOG.info("Removing network connection 'NC2'");
+ assertTrue(localBroker.removeNetworkConnector(nc));
+
+ nc2 = localBroker.getNetworkConnectorByName("NC2");
+ assertNull("Should not find network connector 'NC2'", nc2);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ LOG.info("Setting up LocalBroker");
+ localBroker = new BrokerService();
+ localBroker.setBrokerName("LocalBroker");
+ localBroker.setUseJmx(false);
+ localBroker.setPersistent(false);
+ localBroker.setTransportConnectorURIs(new String[]{LOCAL_BROKER_TRANSPORT_URI});
+ localBroker.start();
+ localBroker.waitUntilStarted();
+
+ LOG.info("Setting up RemoteBroker");
+ remoteBroker = new BrokerService();
+ remoteBroker.setBrokerName("RemoteBroker");
+ remoteBroker.setUseJmx(false);
+ remoteBroker.setPersistent(false);
+ remoteBroker.setTransportConnectorURIs(new String[]{REMOTE_BROKER_TRANSPORT_URI});
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (localBroker.isStarted()) {
+ LOG.info("Stopping LocalBroker");
+ localBroker.stop();
+ localBroker.waitUntilStopped();
+ localBroker = null;
+ }
+
+ if (remoteBroker.isStarted()) {
+ LOG.info("Stopping RemoteBroker");
+ remoteBroker.stop();
+ remoteBroker.waitUntilStopped();
+ remoteBroker = null;
+ }
+ }
+}
\ No newline at end of file