You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/01/11 11:28:44 UTC

svn commit: r1057565 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Tue Jan 11 10:28:43 2011
New Revision: 1057565

URL: http://svn.apache.org/viewvc?rev=1057565&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3124 - Failover transport client gets corrupted connectedBrokers data - apply variant of suggested patch with thanks. Stripping server side url options is the right way to go, reusing api used by discovery publisher to same effect, additional test

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Tue Jan 11 10:28:43 2011
@@ -29,7 +29,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.command.*;
-import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -484,8 +483,8 @@ public class AdvisoryBroker extends Brok
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
             
             String url = getBrokerService().getVmConnectorURI().toString();
-            if (getBrokerService().getDefaultSocketURI() != null) {
-                url = getBrokerService().getDefaultSocketURI().toString();
+            if (getBrokerService().getDefaultSocketURIString() != null) {
+                url = getBrokerService().getDefaultSocketURIString();
             }
             advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
             

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Jan 11 10:28:43 2011
@@ -152,7 +152,7 @@ public class BrokerService implements Se
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
     private URI vmConnectorURI;
-    private URI defaultSocketURI;
+    private String defaultSocketURIString;
     private PolicyMap destinationPolicy;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -1315,24 +1315,24 @@ public class BrokerService implements Se
         this.vmConnectorURI = vmConnectorURI;
     }
     
-    public URI getDefaultSocketURI() {
+    public String getDefaultSocketURIString() {
        
             if (started.get()) {
-                if (this.defaultSocketURI==null) {
+                if (this.defaultSocketURIString ==null) {
                     for (TransportConnector tc:this.transportConnectors) {
-                        URI result = null;
+                        String result = null;
                         try {
-                            result = tc.getConnectUri();
+                            result = tc.getPublishableConnectString();
                         } catch (Exception e) {
                           LOG.warn("Failed to get the ConnectURI for "+tc,e);
                         }
                         if (result != null) {
-                            this.defaultSocketURI=result;
+                            this.defaultSocketURIString =result;
                             break;
                         }
                     }
                 }
-                return this.defaultSocketURI;
+                return this.defaultSocketURIString;
             }
        return null;
     }
@@ -2076,8 +2076,8 @@ public class BrokerService implements Se
                     connector.setLocalUri(uri);
                     connector.setBrokerName(getBrokerName());
                     connector.setDurableDestinations(durableDestinations);
-                    if (getDefaultSocketURI() != null) {
-                        connector.setBrokerURL(getDefaultSocketURI().toString());
+                    if (getDefaultSocketURIString() != null) {
+                        connector.setBrokerURL(getDefaultSocketURIString());
                     }
                     connector.start();
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Jan 11 10:28:43 2011
@@ -584,7 +584,7 @@ public class TransportConnection impleme
         ConnectionId connectionId = info.getSessionId().getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         // Avoid replaying dup commands
-        if (!cs.getSessionIds().contains(info.getSessionId())) {
+        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
             broker.addSession(cs.getContext(), info);
             try {
                 cs.addSession(info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Tue Jan 11 10:28:43 2011
@@ -254,7 +254,7 @@ public class TransportConnector implemen
         LOG.info("Connector " + getName() + " Started");
     }
 
-    private String getPublishableConnectString() throws Exception {
+    public String getPublishableConnectString() throws Exception {
         URI theConnectURI = getConnectUri();
         String publishableConnectString = theConnectURI.toString();
         // strip off server side query parameters which may not be compatible to
@@ -386,23 +386,26 @@ public class TransportConnector implemen
         boolean rebalance = isRebalanceClusterClients();
         String connectedBrokers = "";
         String self = "";
-        if (brokerService.getDefaultSocketURI() != null) {
-            self += brokerService.getDefaultSocketURI().toString();
-            self += ",";
-        }
-        if (rebalance == false) {
-            connectedBrokers += self;
-        }
-        if (this.broker.getPeerBrokerInfos() != null) {
-            for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
-                if (isMatchesClusterFilter(info.getBrokerName())) {
-                    connectedBrokers += info.getBrokerURL();
-                    connectedBrokers += ",";
+
+        if (isUpdateClusterClients()) {
+            if (brokerService.getDefaultSocketURIString() != null) {
+                self += brokerService.getDefaultSocketURIString();
+                self += ",";
+            }
+            if (rebalance == false) {
+                connectedBrokers += self;
+            }
+            if (this.broker.getPeerBrokerInfos() != null) {
+                for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
+                    if (isMatchesClusterFilter(info.getBrokerName())) {
+                        connectedBrokers += info.getBrokerURL();
+                        connectedBrokers += ",";
+                    }
                 }
             }
-        }
-        if (rebalance) {
-            connectedBrokers += self;
+            if (rebalance) {
+                connectedBrokers += self;
+            }
         }
 
         ConnectionControl control = new ConnectionControl();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Tue Jan 11 10:28:43 2011
@@ -58,6 +58,21 @@ private final List<ActiveMQConnection>co
       }
       assertTrue(set.size() > 1);
   }
+
+    public void testClusterURIOptionsStrip() throws Exception{
+        createClients();
+        if (brokerB == null) {
+            // add in server side only url param, should not be propagated
+            brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false");
+        }
+        Thread.sleep(3000);
+        Set<String> set = new HashSet<String>();
+        for (ActiveMQConnection c:connections) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+        }
+        assertTrue(set.size() > 1);
+    }
+
   
   public void testClusterConnectedBeforeClients() throws Exception{
       
@@ -80,7 +95,7 @@ private final List<ActiveMQConnection>co
     @Override
     protected void setUp() throws Exception {
         if (brokerA == null) {
-           brokerA = createBrokerA(BROKER_A_BIND_ADDRESS);
+           brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
         }
         
         
@@ -103,6 +118,7 @@ private final List<ActiveMQConnection>co
     
     protected BrokerService createBrokerA(String uri) throws Exception {
         BrokerService answer = new BrokerService();
+        answer.setUseJmx(false);
         configureConsumerBroker(answer,uri);
         answer.start();
         return answer;
@@ -119,6 +135,7 @@ private final List<ActiveMQConnection>co
     
     protected BrokerService createBrokerB(String uri) throws Exception {
         BrokerService answer = new BrokerService();
+        answer.setUseJmx(false);
         configureNetwork(answer,uri);
         answer.start();
         return answer;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java?rev=1057565&r1=1057564&r2=1057565&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java Tue Jan 11 10:28:43 2011
@@ -18,77 +18,129 @@ package org.apache.activemq.transport.fa
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.net.URI;
-
+import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
-import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.log4j.Logger;
 
 public class FailoverUpdateURIsTest extends TestCase {
-	
-	private static final String QUEUE_NAME = "test.failoverupdateuris";
 
-	public void testUpdateURIs() throws Exception {
-		
-		long timeout = 1000;
-		URI firstTcpUri = new URI("tcp://localhost:61616");
-		URI secondTcpUri = new URI("tcp://localhost:61626");
-                String targetDir = "target/" + getName();
-                new File(targetDir).mkdir();
-                File updateFile = new File(targetDir + "/updateURIsFile.txt");
-                System.out.println(updateFile);
-                System.out.println(updateFile.toURI());
-                System.out.println(updateFile.getAbsoluteFile());
-                System.out.println(updateFile.getAbsoluteFile().toURI());
-                FileOutputStream out = new FileOutputStream(updateFile);
-                out.write(firstTcpUri.toString().getBytes());
-                out.close();
-                              
-		BrokerService bs1 = new BrokerService();
-		bs1.setUseJmx(false);
-		bs1.addConnector(firstTcpUri);
-		bs1.start();
-
-                // no failover uri's to start with, must be read from file...
-		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
-		Connection connection = cf.createConnection();
-                connection.start();
-		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                Queue theQueue = session.createQueue(QUEUE_NAME);
-		MessageProducer producer = session.createProducer(theQueue);
-		MessageConsumer consumer = session.createConsumer(theQueue);
-		Message message = session.createTextMessage("Test message");
-		producer.send(message);
-                Message msg = consumer.receive(2000);
-                assertNotNull(msg);
-		
-		bs1.stop();
-                bs1.waitUntilStopped();
-
-		BrokerService bs2 = new BrokerService();
-		bs2.setUseJmx(false);
-		bs2.addConnector(secondTcpUri);
-		bs2.start();
-		
-		// add the transport uri for broker number 2
-                out = new FileOutputStream(updateFile, true);
-                out.write(",".getBytes());
-                out.write(secondTcpUri.toString().getBytes());
-                out.close();
-
-                producer.send(message);
-                msg = consumer.receive(2000);
-                assertNotNull(msg);
-	}
-	
+    private static final String QUEUE_NAME = "test.failoverupdateuris";
+    private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
+
+    String firstTcpUri = "tcp://localhost:61616";
+    String secondTcpUri = "tcp://localhost:61626";
+    Connection connection = null;
+    BrokerService bs2 = null;
+
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (bs2 != null) {
+            bs2.stop();
+        }
+    }
+
+    public void testUpdateURIsViaFile() throws Exception {
+
+        String targetDir = "target/" + getName();
+        new File(targetDir).mkdir();
+        File updateFile = new File(targetDir + "/updateURIsFile.txt");
+        LOG.info(updateFile);
+        LOG.info(updateFile.toURI());
+        LOG.info(updateFile.getAbsoluteFile());
+        LOG.info(updateFile.getAbsoluteFile().toURI());
+        FileOutputStream out = new FileOutputStream(updateFile);
+        out.write(firstTcpUri.getBytes());
+        out.close();
+
+        BrokerService bs1 = createBroker("bs1", firstTcpUri);
+        bs1.start();
+
+        // no failover uri's to start with, must be read from file...
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
+        connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue theQueue = session.createQueue(QUEUE_NAME);
+        MessageProducer producer = session.createProducer(theQueue);
+        MessageConsumer consumer = session.createConsumer(theQueue);
+        Message message = session.createTextMessage("Test message");
+        producer.send(message);
+        Message msg = consumer.receive(2000);
+        assertNotNull(msg);
+
+        bs1.stop();
+        bs1.waitUntilStopped();
+
+        bs2 = createBroker("bs2", secondTcpUri);
+        bs2.start();
+
+        // add the transport uri for broker number 2
+        out = new FileOutputStream(updateFile, true);
+        out.write(",".getBytes());
+        out.write(secondTcpUri.toString().getBytes());
+        out.close();
+
+        producer.send(message);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);
+    }
+
+    private BrokerService createBroker(String name, String tcpUri) throws Exception {
+        BrokerService bs = new BrokerService();
+        bs.setBrokerName(name);
+        bs.setUseJmx(false);
+        bs.setPersistent(false);
+        bs.addConnector(tcpUri);
+        return bs;
+    }
+
+    public void testAutoUpdateURIs() throws Exception {
+
+        BrokerService bs1 = new BrokerService();
+        bs1.setUseJmx(false);
+        TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
+        transportConnector.setUpdateClusterClients(true);
+        bs1.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue theQueue = session.createQueue(QUEUE_NAME);
+        MessageProducer producer = session.createProducer(theQueue);
+        MessageConsumer consumer = session.createConsumer(theQueue);
+        Message message = session.createTextMessage("Test message");
+        producer.send(message);
+        Message msg = consumer.receive(4000);
+        assertNotNull(msg);
+
+        bs2 = createBroker("bs2", secondTcpUri);
+        NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
+        networkConnector.setDuplex(true);
+        bs2.start();
+        LOG.info("started brokerService 2");
+        bs2.waitUntilStarted();
+
+        TimeUnit.SECONDS.sleep(4);
+
+        LOG.info("stopping brokerService 1");
+        bs1.stop();
+        bs1.waitUntilStopped();
+
+        producer.send(message);
+        msg = consumer.receive(4000);
+        assertNotNull(msg);
+    }
 }