You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/04/28 18:12:30 UTC
svn commit: r938998 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/
main/java/org/apache/activemq/transport/failover/
main/java/org/apache/activemq/transport/vm/
main/java/org/apache/activemq/util/ test/java/org/apache/ac...
Author: gtully
Date: Wed Apr 28 16:12:29 2010
New Revision: 938998
URL: http://svn.apache.org/viewvc?rev=938998&view=rev
Log:
tidy up vm url handling, fix failover and ssl context: resolve https://issues.apache.org/activemq/browse/AMQ-2715 and add test for same, improve logging around network connector
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Apr 28 16:12:29 2010
@@ -71,7 +71,6 @@ public class DiscoveryNetworkConnector e
}
public void onServiceAdd(DiscoveryEvent event) {
- String localURIName = localURI.getScheme() + "://" + localURI.getHost();
// Ignore events once we start stopping.
if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
return;
@@ -100,7 +99,7 @@ public class DiscoveryNetworkConnector e
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
}
- LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
+ LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
Transport remoteTransport;
Transport localTransport;
@@ -118,7 +117,7 @@ public class DiscoveryNetworkConnector e
localTransport = createLocalTransport();
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
- LOG.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
+ LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e);
return;
}
@@ -132,7 +131,7 @@ public class DiscoveryNetworkConnector e
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
- LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e);
+ LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
LOG.debug("Start failure exception: " + e, e);
try {
discoveryAgent.serviceFailed(event);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Apr 28 16:12:29 2010
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
@@ -109,9 +111,11 @@ public class FailoverTransport implement
private final TransportListener myTransportListener = createTransportListener();
private boolean updateURIsSupported=true;
private boolean reconnectSupported=true;
+ // remember for reconnect thread
+ private SslContext brokerSslContext;
public FailoverTransport() throws InterruptedIOException {
-
+ brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@@ -792,6 +796,7 @@ public class FailoverTransport implement
Transport t = null;
try {
LOG.debug("Attempting connect to: " + uri);
+ SslContext.setCurrentSslContext(brokerSslContext);
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
@@ -842,6 +847,8 @@ public class FailoverTransport implement
LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
}
}
+ } finally {
+ SslContext.setCurrentSslContext(null);
}
}
}
@@ -921,6 +928,7 @@ public class FailoverTransport implement
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
+ SslContext.setCurrentSslContext(brokerSslContext);
BackupTransport bt = new BackupTransport(this);
bt.setUri(uri);
if (!backups.contains(bt)) {
@@ -932,6 +940,8 @@ public class FailoverTransport implement
}
} catch (Exception e) {
LOG.debug("Failed to build backup ", e);
+ } finally {
+ SslContext.setCurrentSslContext(null);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Wed Apr 28 16:12:29 2010
@@ -76,7 +76,7 @@ public class VMTransportFactory extends
// If using the less complex vm://localhost?broker.persistent=true
// form
try {
- host = location.getHost();
+ host = extractHost(location);
options = URISupport.parseParamters(location);
String config = (String)options.remove("brokerConfig");
if (config != null) {
@@ -157,7 +157,18 @@ public class VMTransportFactory extends
return transport;
}
- /**
+ private static String extractHost(URI location) {
+ String host = location.getHost();
+ if (host == null || host.length() == 0) {
+ host = location.getAuthority();
+ if (host == null || host.length() == 0) {
+ host = "localhost";
+ }
+ }
+ return host;
+ }
+
+/**
* @param registry
* @param brokerName
* @param waitForStart - time in milliseconds to wait for a broker to appear
@@ -193,7 +204,7 @@ public class VMTransportFactory extends
* @throws IOException
*/
private TransportServer bind(URI location, boolean dispose) throws IOException {
- String host = location.getHost();
+ String host = extractHost(location);
LOG.debug("binding to broker: " + host);
VMTransportServer server = new VMTransportServer(location, dispose);
Object currentBoundValue = SERVERS.get(host);
@@ -205,7 +216,7 @@ public class VMTransportFactory extends
}
public static void stopped(VMTransportServer server) {
- String host = server.getBindURI().getHost();
+ String host = extractHost(server.getBindURI());
stopped(host);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Wed Apr 28 16:12:29 2010
@@ -157,8 +157,16 @@ public class URISupport {
* Creates a URI with the given query
*/
public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
- return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
- query, uri.getFragment());
+ String schemeSpecificPart = uri.getRawSchemeSpecificPart();
+ // strip existing query if any
+ int questionMark = schemeSpecificPart.lastIndexOf("?");
+ if (questionMark > 0) {
+ schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
+ }
+ if (query != null && query.length() > 0) {
+ schemeSpecificPart += "?" + query;
+ }
+ return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
}
public static CompositeData parseComposite(URI uri) throws URISyntaxException {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=938998&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Wed Apr 28 16:12:29 2010
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FailoverStaticNetworkTest {
+ protected static final Log LOG = LogFactory.getLog(FailoverStaticNetworkTest.class);
+
+ private final static String DESTINATION_NAME = "testQ";
+ protected BrokerService brokerA;
+ protected BrokerService brokerB;
+
+
+ private SslContext sslContext;
+
+ protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setSslContext(sslContext);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.setBrokerName("Broker_" + listenPort);
+ broker.addConnector(scheme + "://localhost:" + listenPort);
+ if (networkToPorts != null && networkToPorts.length > 0) {
+ StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
+ builder.append(networkToPorts[0]);
+ for (int i=1;i<networkToPorts.length; i++) {
+ builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
+ }
+ builder.append(")?randomize=false)");
+ broker.addNetworkConnector(builder.toString());
+ }
+ return broker;
+ }
+
+ @Before
+ public void init() throws Exception {
+ KeyManager[] km = SslBrokerServiceTest.getKeyManager();
+ TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
+ sslContext = new SslContext(km, tm, null);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ brokerB.stop();
+ brokerB.waitUntilStopped();
+
+ brokerA.stop();
+ brokerA.waitUntilStopped();
+ }
+
+ /**
+ * networked broker started after target so first connect attempt succeeds
+ * start order is important
+ */
+ @Test
+ public void testSendReceive() throws Exception {
+
+ brokerA = createBroker("tcp", "61617", null);
+ brokerA.start();
+ brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
+ brokerB.start();
+
+ testNetworkSendReceive();
+ }
+
+ @Test
+ public void testSendReceiveSsl() throws Exception {
+
+ brokerA = createBroker("ssl", "61617", null);
+ brokerA.start();
+ brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
+ brokerB.start();
+
+ testNetworkSendReceive();
+ }
+
+ private void testNetworkSendReceive() throws Exception, JMSException {
+ LOG.info("Creating Consumer on the networked broker ...");
+
+ SslContext.setCurrentSslContext(sslContext);
+ // Create a consumer on brokerB
+ ConnectionFactory consFactory = createConnectionFactory(brokerA);
+ Connection consConn = consFactory.createConnection();
+ consConn.start();
+ Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
+ final MessageConsumer consumer = consSession.createConsumer(destination);
+
+ sendMessageTo(destination, brokerB);
+
+ assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return consumer.receive(1000) != null;
+ }
+ }));
+ }
+
+ private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {
+ ConnectionFactory factory = createConnectionFactory(brokerService);
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createProducer(destination).send(session.createTextMessage("Hi"));
+ conn.close();
+ }
+
+ protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
+ String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+ connectionFactory.setOptimizedMessageDispatch(true);
+ connectionFactory.setDispatchAsync(false);
+ connectionFactory.setUseAsyncSend(false);
+ connectionFactory.setOptimizeAcknowledge(false);
+ connectionFactory.setAlwaysSyncSend(true);
+ return connectionFactory;
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java Wed Apr 28 16:12:29 2010
@@ -121,7 +121,7 @@ public class SslBrokerServiceTest extend
LOG.info("peer cert: " + session.getPeerCertificateChain()[0].toString());
}
- private TrustManager[] getTrustManager() throws Exception {
+ public static TrustManager[] getTrustManager() throws Exception {
TrustManager[] trustStoreManagers = null;
KeyStore trustedCertStore = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);
@@ -134,7 +134,7 @@ public class SslBrokerServiceTest extend
return trustStoreManagers;
}
- private KeyManager[] getKeyManager() throws Exception {
+ public static KeyManager[] getKeyManager() throws Exception {
KeyManagerFactory kmf =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java?rev=938998&r1=938997&r2=938998&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java Wed Apr 28 16:12:29 2010
@@ -92,4 +92,14 @@ public class URISupportTest extends Test
assertTrue(URISupport.checkParenthesis(str));
}
+ public void testCreateWithQuery() throws Exception {
+ URI source = new URI("vm://localhost");
+ URI dest = URISupport.createURIWithQuery(source, "network=true&one=two");
+
+ assertEquals("correct param count", 2, URISupport.parseParamters(dest).size());
+ assertEquals("same uri, host", source.getHost(), dest.getHost());
+ assertEquals("same uri, scheme", source.getScheme(), dest.getScheme());
+ assertFalse("same uri, ssp", dest.getQuery().equals(source.getQuery()));
+ }
+
}