You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/04/28 18:12:30 UTC

svn commit: r938998 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/failover/ main/java/org/apache/activemq/transport/vm/ main/java/org/apache/activemq/util/ test/java/org/apache/ac...

Author: gtully
Date: Wed Apr 28 16:12:29 2010
New Revision: 938998

URL: http://svn.apache.org/viewvc?rev=938998&view=rev
Log:
tidy up vm url handling, fix failover and ssl context: resolve https://issues.apache.org/activemq/browse/AMQ-2715 and add test for same, improve logging around network connector

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Apr 28 16:12:29 2010
@@ -71,7 +71,6 @@ public class DiscoveryNetworkConnector e
     }
 
     public void onServiceAdd(DiscoveryEvent event) {
-        String localURIName = localURI.getScheme() + "://" + localURI.getHost();
         // Ignore events once we start stopping.
         if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
             return;
@@ -100,7 +99,7 @@ public class DiscoveryNetworkConnector e
             } catch (URISyntaxException e) {
                 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
             }
-            LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
+            LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
 
             Transport remoteTransport;
             Transport localTransport;
@@ -118,7 +117,7 @@ public class DiscoveryNetworkConnector e
                     localTransport = createLocalTransport();
                 } catch (Exception e) {
                     ServiceSupport.dispose(remoteTransport);
-                    LOG.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
+                    LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
                     LOG.debug("Connection failure exception: " + e, e);
                     return;
                 }
@@ -132,7 +131,7 @@ public class DiscoveryNetworkConnector e
             } catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);
-                LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e);
+                LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
                 LOG.debug("Start failure exception: " + e, e);
                 try {
                     discoveryAgent.serviceFailed(event);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Apr 28 16:12:29 2010
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionId;
@@ -109,9 +111,11 @@ public class FailoverTransport implement
     private final TransportListener myTransportListener = createTransportListener();
     private boolean updateURIsSupported=true;
     private boolean reconnectSupported=true;
+    // remember for reconnect thread
+    private SslContext brokerSslContext;
 
     public FailoverTransport() throws InterruptedIOException {
-
+        brokerSslContext = SslContext.getCurrentSslContext();
         stateTracker.setTrackTransactions(true);
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@@ -792,6 +796,7 @@ public class FailoverTransport implement
                         Transport t = null;
                         try {
                             LOG.debug("Attempting connect to: " + uri);
+                            SslContext.setCurrentSslContext(brokerSslContext);
                             t = TransportFactory.compositeConnect(uri);
                             t.setTransportListener(myTransportListener);
                             t.start();
@@ -842,6 +847,8 @@ public class FailoverTransport implement
                                     LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
                                 }
                             }
+                        } finally {
+                            SslContext.setCurrentSslContext(null);
                         }
                     }
                 }
@@ -921,6 +928,7 @@ public class FailoverTransport implement
                     URI uri = iter.next();
                     if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
                         try {
+                            SslContext.setCurrentSslContext(brokerSslContext);
                             BackupTransport bt = new BackupTransport(this);
                             bt.setUri(uri);
                             if (!backups.contains(bt)) {
@@ -932,6 +940,8 @@ public class FailoverTransport implement
                             }
                         } catch (Exception e) {
                             LOG.debug("Failed to build backup ", e);
+                        } finally {
+                            SslContext.setCurrentSslContext(null);
                         }
                     }
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Wed Apr 28 16:12:29 2010
@@ -76,7 +76,7 @@ public class VMTransportFactory extends 
             // If using the less complex vm://localhost?broker.persistent=true
             // form
             try {
-                host = location.getHost();
+                host = extractHost(location);
                 options = URISupport.parseParamters(location);
                 String config = (String)options.remove("brokerConfig");
                 if (config != null) {
@@ -157,7 +157,18 @@ public class VMTransportFactory extends 
         return transport;
     }
 
-   /**
+   private static String extractHost(URI location) {
+       String host = location.getHost();
+       if (host == null || host.length() == 0) {
+           host = location.getAuthority();
+           if (host == null || host.length() == 0) {
+               host = "localhost";
+           }
+       }
+       return host;
+    }
+
+/**
     * @param registry
     * @param brokerName
     * @param waitForStart - time in milliseconds to wait for a broker to appear
@@ -193,7 +204,7 @@ public class VMTransportFactory extends 
      * @throws IOException
      */
     private TransportServer bind(URI location, boolean dispose) throws IOException {
-        String host = location.getHost();
+        String host = extractHost(location);
         LOG.debug("binding to broker: " + host);
         VMTransportServer server = new VMTransportServer(location, dispose);
         Object currentBoundValue = SERVERS.get(host);
@@ -205,7 +216,7 @@ public class VMTransportFactory extends 
     }
 
     public static void stopped(VMTransportServer server) {
-        String host = server.getBindURI().getHost();
+        String host = extractHost(server.getBindURI());
         stopped(host);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Wed Apr 28 16:12:29 2010
@@ -157,8 +157,16 @@ public class URISupport {
      * Creates a URI with the given query
      */
     public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
-        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
-                       query, uri.getFragment());
+        String schemeSpecificPart = uri.getRawSchemeSpecificPart();
+        // strip existing query if any
+        int questionMark = schemeSpecificPart.lastIndexOf("?");
+        if (questionMark > 0) {
+            schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
+        }
+        if (query != null && query.length() > 0) {
+            schemeSpecificPart += "?" + query;
+         }
+        return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
     }
 
     public static CompositeData parseComposite(URI uri) throws URISyntaxException {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=938998&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Wed Apr 28 16:12:29 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FailoverStaticNetworkTest {
+    protected static final Log LOG = LogFactory.getLog(FailoverStaticNetworkTest.class);
+
+	private final static String DESTINATION_NAME = "testQ";
+	protected BrokerService brokerA;
+    protected BrokerService brokerB;
+
+
+    private SslContext sslContext;
+    
+    protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setSslContext(sslContext);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setBrokerName("Broker_" + listenPort);
+        broker.addConnector(scheme + "://localhost:" + listenPort);
+        if (networkToPorts != null && networkToPorts.length > 0) {
+            StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
+            builder.append(networkToPorts[0]);
+            for (int i=1;i<networkToPorts.length; i++) {
+                builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
+            }
+            builder.append(")?randomize=false)");
+            broker.addNetworkConnector(builder.toString());
+        }
+        return broker;
+    }
+  
+    @Before
+    public void init() throws Exception {
+        KeyManager[] km = SslBrokerServiceTest.getKeyManager();
+        TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
+        sslContext = new SslContext(km, tm, null);
+    }
+    
+    @After
+    public void cleanup() throws Exception {
+        brokerB.stop();
+        brokerB.waitUntilStopped();
+        
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+    }
+
+    /**
+     * networked broker started after target so first connect attempt succeeds
+     * start order is important
+     */
+    @Test
+    public void testSendReceive() throws Exception {
+              
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();        
+        brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
+        brokerB.start();
+  
+        testNetworkSendReceive();
+    }
+
+    @Test
+    public void testSendReceiveSsl() throws Exception {
+              
+        brokerA = createBroker("ssl", "61617", null);
+        brokerA.start();        
+        brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
+        brokerB.start();
+  
+        testNetworkSendReceive();
+    }
+
+    private void testNetworkSendReceive() throws Exception, JMSException {
+        LOG.info("Creating Consumer on the networked broker ...");
+        
+        SslContext.setCurrentSslContext(sslContext);
+        // Create a consumer on brokerB
+        ConnectionFactory consFactory = createConnectionFactory(brokerA);
+        Connection consConn = consFactory.createConnection();
+        consConn.start();
+        Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
+        final MessageConsumer consumer = consSession.createConsumer(destination);
+        
+        sendMessageTo(destination, brokerB);
+        
+        assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return consumer.receive(1000) != null;
+            }      
+        }));
+    }
+
+    private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createProducer(destination).send(session.createTextMessage("Hi"));
+        conn.close();
+    }
+    
+    protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {    
+        String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+        connectionFactory.setOptimizedMessageDispatch(true);
+        connectionFactory.setDispatchAsync(false);
+        connectionFactory.setUseAsyncSend(false);
+        connectionFactory.setOptimizeAcknowledge(false);
+        connectionFactory.setAlwaysSyncSend(true);
+        return connectionFactory;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java Wed Apr 28 16:12:29 2010
@@ -121,7 +121,7 @@ public class SslBrokerServiceTest extend
         LOG.info("peer cert: " + session.getPeerCertificateChain()[0].toString());    
     }
     
-    private TrustManager[] getTrustManager() throws Exception {
+    public static TrustManager[] getTrustManager() throws Exception {
         TrustManager[] trustStoreManagers = null;
         KeyStore trustedCertStore = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);
         
@@ -134,7 +134,7 @@ public class SslBrokerServiceTest extend
         return trustStoreManagers; 
     }
 
-    private KeyManager[] getKeyManager() throws Exception {
+    public static KeyManager[] getKeyManager() throws Exception {
         KeyManagerFactory kmf = 
             KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());  
         KeyStore ks = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java Wed Apr 28 16:12:29 2010
@@ -92,4 +92,14 @@ public class URISupportTest extends Test
         assertTrue(URISupport.checkParenthesis(str));
     }
     
+    public void testCreateWithQuery() throws Exception {
+        URI source = new URI("vm://localhost");
+        URI dest = URISupport.createURIWithQuery(source, "network=true&one=two");
+        
+        assertEquals("correct param count", 2, URISupport.parseParamters(dest).size());
+        assertEquals("same uri, host", source.getHost(), dest.getHost());
+        assertEquals("same uri, scheme", source.getScheme(), dest.getScheme());
+        assertFalse("same uri, ssp", dest.getQuery().equals(source.getQuery()));
+    }
+    
 }