You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/01/11 11:28:44 UTC
svn commit: r1057565 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/
test/java/org/apache/activemq/transport/failover/
Author: gtully
Date: Tue Jan 11 10:28:43 2011
New Revision: 1057565
URL: http://svn.apache.org/viewvc?rev=1057565&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3124 - Failover transport client gets corrupted connectedBrokers data - apply variant of suggested patch with thanks. Stripping server side url options is the right way to go, reusing api used by discovery publisher to same effect, additional test
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Tue Jan 11 10:28:43 2011
@@ -29,7 +29,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.*;
-import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -484,8 +483,8 @@ public class AdvisoryBroker extends Brok
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = getBrokerService().getVmConnectorURI().toString();
- if (getBrokerService().getDefaultSocketURI() != null) {
- url = getBrokerService().getDefaultSocketURI().toString();
+ if (getBrokerService().getDefaultSocketURIString() != null) {
+ url = getBrokerService().getDefaultSocketURIString();
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Jan 11 10:28:43 2011
@@ -152,7 +152,7 @@ public class BrokerService implements Se
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private URI vmConnectorURI;
- private URI defaultSocketURI;
+ private String defaultSocketURIString;
private PolicyMap destinationPolicy;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -1315,24 +1315,24 @@ public class BrokerService implements Se
this.vmConnectorURI = vmConnectorURI;
}
- public URI getDefaultSocketURI() {
+ public String getDefaultSocketURIString() {
if (started.get()) {
- if (this.defaultSocketURI==null) {
+ if (this.defaultSocketURIString ==null) {
for (TransportConnector tc:this.transportConnectors) {
- URI result = null;
+ String result = null;
try {
- result = tc.getConnectUri();
+ result = tc.getPublishableConnectString();
} catch (Exception e) {
LOG.warn("Failed to get the ConnectURI for "+tc,e);
}
if (result != null) {
- this.defaultSocketURI=result;
+ this.defaultSocketURIString =result;
break;
}
}
}
- return this.defaultSocketURI;
+ return this.defaultSocketURIString;
}
return null;
}
@@ -2076,8 +2076,8 @@ public class BrokerService implements Se
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
- if (getDefaultSocketURI() != null) {
- connector.setBrokerURL(getDefaultSocketURI().toString());
+ if (getDefaultSocketURIString() != null) {
+ connector.setBrokerURL(getDefaultSocketURIString());
}
connector.start();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Jan 11 10:28:43 2011
@@ -584,7 +584,7 @@ public class TransportConnection impleme
ConnectionId connectionId = info.getSessionId().getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
// Avoid replaying dup commands
- if (!cs.getSessionIds().contains(info.getSessionId())) {
+ if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Tue Jan 11 10:28:43 2011
@@ -254,7 +254,7 @@ public class TransportConnector implemen
LOG.info("Connector " + getName() + " Started");
}
- private String getPublishableConnectString() throws Exception {
+ public String getPublishableConnectString() throws Exception {
URI theConnectURI = getConnectUri();
String publishableConnectString = theConnectURI.toString();
// strip off server side query parameters which may not be compatible to
@@ -386,23 +386,26 @@ public class TransportConnector implemen
boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = "";
String self = "";
- if (brokerService.getDefaultSocketURI() != null) {
- self += brokerService.getDefaultSocketURI().toString();
- self += ",";
- }
- if (rebalance == false) {
- connectedBrokers += self;
- }
- if (this.broker.getPeerBrokerInfos() != null) {
- for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
- if (isMatchesClusterFilter(info.getBrokerName())) {
- connectedBrokers += info.getBrokerURL();
- connectedBrokers += ",";
+
+ if (isUpdateClusterClients()) {
+ if (brokerService.getDefaultSocketURIString() != null) {
+ self += brokerService.getDefaultSocketURIString();
+ self += ",";
+ }
+ if (rebalance == false) {
+ connectedBrokers += self;
+ }
+ if (this.broker.getPeerBrokerInfos() != null) {
+ for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
+ if (isMatchesClusterFilter(info.getBrokerName())) {
+ connectedBrokers += info.getBrokerURL();
+ connectedBrokers += ",";
+ }
}
}
- }
- if (rebalance) {
- connectedBrokers += self;
+ if (rebalance) {
+ connectedBrokers += self;
+ }
}
ConnectionControl control = new ConnectionControl();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Tue Jan 11 10:28:43 2011
@@ -58,6 +58,21 @@ private final List<ActiveMQConnection>co
}
assertTrue(set.size() > 1);
}
+
+ public void testClusterURIOptionsStrip() throws Exception{
+ createClients();
+ if (brokerB == null) {
+ // add in server side only url param, should not be propagated
+ brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false");
+ }
+ Thread.sleep(3000);
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c:connections) {
+ set.add(c.getTransportChannel().getRemoteAddress());
+ }
+ assertTrue(set.size() > 1);
+ }
+
public void testClusterConnectedBeforeClients() throws Exception{
@@ -80,7 +95,7 @@ private final List<ActiveMQConnection>co
@Override
protected void setUp() throws Exception {
if (brokerA == null) {
- brokerA = createBrokerA(BROKER_A_BIND_ADDRESS);
+ brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
}
@@ -103,6 +118,7 @@ private final List<ActiveMQConnection>co
protected BrokerService createBrokerA(String uri) throws Exception {
BrokerService answer = new BrokerService();
+ answer.setUseJmx(false);
configureConsumerBroker(answer,uri);
answer.start();
return answer;
@@ -119,6 +135,7 @@ private final List<ActiveMQConnection>co
protected BrokerService createBrokerB(String uri) throws Exception {
BrokerService answer = new BrokerService();
+ answer.setUseJmx(false);
configureNetwork(answer,uri);
answer.start();
return answer;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java Tue Jan 11 10:28:43 2011
@@ -18,77 +18,129 @@ package org.apache.activemq.transport.fa
import java.io.File;
import java.io.FileOutputStream;
-import java.net.URI;
-
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
-import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.TextMessage;
-
import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.log4j.Logger;
public class FailoverUpdateURIsTest extends TestCase {
-
- private static final String QUEUE_NAME = "test.failoverupdateuris";
- public void testUpdateURIs() throws Exception {
-
- long timeout = 1000;
- URI firstTcpUri = new URI("tcp://localhost:61616");
- URI secondTcpUri = new URI("tcp://localhost:61626");
- String targetDir = "target/" + getName();
- new File(targetDir).mkdir();
- File updateFile = new File(targetDir + "/updateURIsFile.txt");
- System.out.println(updateFile);
- System.out.println(updateFile.toURI());
- System.out.println(updateFile.getAbsoluteFile());
- System.out.println(updateFile.getAbsoluteFile().toURI());
- FileOutputStream out = new FileOutputStream(updateFile);
- out.write(firstTcpUri.toString().getBytes());
- out.close();
-
- BrokerService bs1 = new BrokerService();
- bs1.setUseJmx(false);
- bs1.addConnector(firstTcpUri);
- bs1.start();
-
- // no failover uri's to start with, must be read from file...
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
- Connection connection = cf.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue theQueue = session.createQueue(QUEUE_NAME);
- MessageProducer producer = session.createProducer(theQueue);
- MessageConsumer consumer = session.createConsumer(theQueue);
- Message message = session.createTextMessage("Test message");
- producer.send(message);
- Message msg = consumer.receive(2000);
- assertNotNull(msg);
-
- bs1.stop();
- bs1.waitUntilStopped();
-
- BrokerService bs2 = new BrokerService();
- bs2.setUseJmx(false);
- bs2.addConnector(secondTcpUri);
- bs2.start();
-
- // add the transport uri for broker number 2
- out = new FileOutputStream(updateFile, true);
- out.write(",".getBytes());
- out.write(secondTcpUri.toString().getBytes());
- out.close();
-
- producer.send(message);
- msg = consumer.receive(2000);
- assertNotNull(msg);
- }
-
+ private static final String QUEUE_NAME = "test.failoverupdateuris";
+ private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
+
+ String firstTcpUri = "tcp://localhost:61616";
+ String secondTcpUri = "tcp://localhost:61626";
+ Connection connection = null;
+ BrokerService bs2 = null;
+
+ public void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ if (bs2 != null) {
+ bs2.stop();
+ }
+ }
+
+ public void testUpdateURIsViaFile() throws Exception {
+
+ String targetDir = "target/" + getName();
+ new File(targetDir).mkdir();
+ File updateFile = new File(targetDir + "/updateURIsFile.txt");
+ LOG.info(updateFile);
+ LOG.info(updateFile.toURI());
+ LOG.info(updateFile.getAbsoluteFile());
+ LOG.info(updateFile.getAbsoluteFile().toURI());
+ FileOutputStream out = new FileOutputStream(updateFile);
+ out.write(firstTcpUri.getBytes());
+ out.close();
+
+ BrokerService bs1 = createBroker("bs1", firstTcpUri);
+ bs1.start();
+
+ // no failover uri's to start with, must be read from file...
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
+ connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue theQueue = session.createQueue(QUEUE_NAME);
+ MessageProducer producer = session.createProducer(theQueue);
+ MessageConsumer consumer = session.createConsumer(theQueue);
+ Message message = session.createTextMessage("Test message");
+ producer.send(message);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+
+ bs1.stop();
+ bs1.waitUntilStopped();
+
+ bs2 = createBroker("bs2", secondTcpUri);
+ bs2.start();
+
+ // add the transport uri for broker number 2
+ out = new FileOutputStream(updateFile, true);
+ out.write(",".getBytes());
+ out.write(secondTcpUri.toString().getBytes());
+ out.close();
+
+ producer.send(message);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ }
+
+ private BrokerService createBroker(String name, String tcpUri) throws Exception {
+ BrokerService bs = new BrokerService();
+ bs.setBrokerName(name);
+ bs.setUseJmx(false);
+ bs.setPersistent(false);
+ bs.addConnector(tcpUri);
+ return bs;
+ }
+
+ public void testAutoUpdateURIs() throws Exception {
+
+ BrokerService bs1 = new BrokerService();
+ bs1.setUseJmx(false);
+ TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
+ transportConnector.setUpdateClusterClients(true);
+ bs1.start();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue theQueue = session.createQueue(QUEUE_NAME);
+ MessageProducer producer = session.createProducer(theQueue);
+ MessageConsumer consumer = session.createConsumer(theQueue);
+ Message message = session.createTextMessage("Test message");
+ producer.send(message);
+ Message msg = consumer.receive(4000);
+ assertNotNull(msg);
+
+ bs2 = createBroker("bs2", secondTcpUri);
+ NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
+ networkConnector.setDuplex(true);
+ bs2.start();
+ LOG.info("started brokerService 2");
+ bs2.waitUntilStarted();
+
+ TimeUnit.SECONDS.sleep(4);
+
+ LOG.info("stopping brokerService 1");
+ bs1.stop();
+ bs1.waitUntilStopped();
+
+ producer.send(message);
+ msg = consumer.receive(4000);
+ assertNotNull(msg);
+ }
}