You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/20 10:07:49 UTC
svn commit: r687280 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/transport/vm/
activemq-core/src/test/java/org/apache/activemq/bugs/
activemq-core/src/test/java/org/apache...
Author: gtully
Date: Wed Aug 20 01:07:48 2008
New Revision: 687280
URL: http://svn.apache.org/viewvc?rev=687280&view=rev
Log:
add waitForStart option to vm url, fix AMQ-1895
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java
activemq/trunk/assembly/src/release/conf/activemq.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java Wed Aug 20 01:07:48 2008
@@ -78,6 +78,7 @@
public void bind(String brokerName, BrokerService broker) {
synchronized (mutex) {
brokers.put(brokerName, broker);
+ mutex.notifyAll();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Aug 20 01:07:48 2008
@@ -445,7 +445,6 @@
processHelperProperties();
- BrokerRegistry.getInstance().bind(getBrokerName(), this);
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
@@ -465,7 +464,8 @@
}
getBroker().start();
-
+ BrokerRegistry.getInstance().bind(getBrokerName(), this);
+
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Wed Aug 20 01:07:48 2008
@@ -58,6 +58,7 @@
String host;
Map<String, String> options;
boolean create = true;
+ int waitForStart = -1;
CompositeData data = URISupport.parseComposite(location);
if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
brokerURI = data.getComponents()[0];
@@ -88,6 +89,10 @@
if ("false".equals(options.remove("create"))) {
create = false;
}
+ String waitForStartString = options.remove("waitForStart");
+ if (waitForStartString != null) {
+ waitForStart = Integer.parseInt(waitForStartString);
+ }
} catch (URISyntaxException e1) {
throw IOExceptionSupport.create(e1);
}
@@ -102,10 +107,9 @@
BrokerService broker = null;
// Synchronize on the registry so that multiple concurrent threads
// doing this do not think that the broker has not been created and
- // cause multiple
- // brokers to be started.
+ // cause multiple brokers to be started.
synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
- broker = BrokerRegistry.getInstance().lookup(host);
+ broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
if (broker == null) {
if (!create) {
throw new IOException("Broker named '" + host + "' does not exist.");
@@ -121,6 +125,7 @@
throw IOExceptionSupport.create(e);
}
BROKERS.put(host, broker);
+ BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
}
server = SERVERS.get(host);
@@ -152,6 +157,32 @@
return transport;
}
+ /**
+ * @param registry
+ * @param brokerName
+ * @param waitForStart - time in milliseconds to wait for a broker to appear
+ * @return
+ */
+ private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
+ BrokerService broker = null;
+ synchronized(registry.getRegistryMutext()) {
+ broker = registry.lookup(brokerName);
+ if (broker == null && waitForStart > 0) {
+ final long expiry = System.currentTimeMillis() + waitForStart;
+ while (broker == null && expiry > System.currentTimeMillis()) {
+ long timeout = Math.max(0, expiry - System.currentTimeMillis());
+ try {
+ LOG.debug("waiting for broker named: " + brokerName + " to start");
+ registry.getRegistryMutext().wait(timeout);
+ } catch (InterruptedException ignored) {
+ }
+ broker = registry.lookup(brokerName);
+ }
+ }
+ }
+ return broker;
+ }
+
public TransportServer doBind(URI location) throws IOException {
return bind(location, false);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Wed Aug 20 01:07:48 2008
@@ -38,4 +38,8 @@
session.commit();
}
}
+
+ public MessageProducer getProducer() {
+ return producer;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java Wed Aug 20 01:07:48 2008
@@ -79,7 +79,7 @@
// Give broker enough time to receive and register the consumer info
// Either that or make consumer retroactive
try {
- Thread.sleep(1000);
+ Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java?rev=687280&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java Wed Aug 20 01:07:48 2008
@@ -0,0 +1,60 @@
+package org.apache.activemq.transport.vm;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class VMTransportWaitForTest extends TestCase {
+
+ private static final String VM_BROKER_URI_NO_WAIT =
+ "vm://localhost?broker.persistent=false&create=false";
+
+ private static final String VM_BROKER_URI_WAIT_FOR_START =
+ VM_BROKER_URI_NO_WAIT + "&waitForStart=20000";
+
+ CountDownLatch started = new CountDownLatch(1);
+ CountDownLatch gotConnection = new CountDownLatch(1);
+
+ public void testWaitFor() throws Exception {
+ try {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
+ cf.createConnection();
+ fail("expect broker not exist exception");
+ } catch (JMSException expectedOnNoBrokerAndNoCreate) {
+ }
+
+ // spawn a thread that will wait for an embedded broker to start via vm://..
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ started.countDown();
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
+ cf.createConnection();
+ gotConnection.countDown();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception:" + e);
+ }
+ }
+ };
+ t.start();
+ started.await(20, TimeUnit.SECONDS);
+ Thread.yield();
+ assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
+
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS));
+ broker.stop();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/assembly/src/release/conf/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/conf/activemq.xml?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/conf/activemq.xml (original)
+++ activemq/trunk/assembly/src/release/conf/activemq.xml Wed Aug 20 01:07:48 2008
@@ -113,6 +113,10 @@
**
** http://activemq.apache.org/enterprise-integration-patterns.html
-->
+ <!-- configure the camel activemq component to use the current broker -->
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
+ <property name="brokerURL" value="vm://localhost?create=false&waitForStart=10000" />
+ </bean>
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
<!-- You can use a <package> element for each root package to search for Java routes -->