You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/20 10:07:49 UTC

svn commit: r687280 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/transport/vm/ activemq-core/src/test/java/org/apache/activemq/bugs/ activemq-core/src/test/java/org/apache...

Author: gtully
Date: Wed Aug 20 01:07:48 2008
New Revision: 687280

URL: http://svn.apache.org/viewvc?rev=687280&view=rev
Log:
add waitForStart option to vm url, fix AMQ-1895

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java
    activemq/trunk/assembly/src/release/conf/activemq.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java Wed Aug 20 01:07:48 2008
@@ -78,6 +78,7 @@
     public void bind(String brokerName, BrokerService broker) {
         synchronized (mutex) {
             brokers.put(brokerName, broker);
+            mutex.notifyAll();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Aug 20 01:07:48 2008
@@ -445,7 +445,6 @@
             
             processHelperProperties();
 
-            BrokerRegistry.getInstance().bind(getBrokerName(), this);
 
             
             getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
@@ -465,7 +464,8 @@
             }
             
             getBroker().start();
-
+            BrokerRegistry.getInstance().bind(getBrokerName(), this);
+            
            // see if there is a MasterBroker service and if so, configure
             // it and start it.
             for (Service service : services) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Wed Aug 20 01:07:48 2008
@@ -58,6 +58,7 @@
         String host;
         Map<String, String> options;
         boolean create = true;
+        int waitForStart = -1;
         CompositeData data = URISupport.parseComposite(location);
         if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
             brokerURI = data.getComponents()[0];
@@ -88,6 +89,10 @@
                 if ("false".equals(options.remove("create"))) {
                     create = false;
                 }
+                String waitForStartString = options.remove("waitForStart");
+                if (waitForStartString != null) {
+                    waitForStart = Integer.parseInt(waitForStartString);
+                }
             } catch (URISyntaxException e1) {
                 throw IOExceptionSupport.create(e1);
             }
@@ -102,10 +107,9 @@
             BrokerService broker = null;
             // Synchronize on the registry so that multiple concurrent threads
             // doing this do not think that the broker has not been created and
-            // cause multiple
-            // brokers to be started.
+            // cause multiple brokers to be started.
             synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
-                broker = BrokerRegistry.getInstance().lookup(host);
+                broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
                 if (broker == null) {
                     if (!create) {
                         throw new IOException("Broker named '" + host + "' does not exist.");
@@ -121,6 +125,7 @@
                         throw IOExceptionSupport.create(e);
                     }
                     BROKERS.put(host, broker);
+                    BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
                 }
 
                 server = SERVERS.get(host);
@@ -152,6 +157,32 @@
         return transport;
     }
 
+   /**
+    * @param registry
+    * @param brokerName
+    * @param waitForStart - time in milliseconds to wait for a broker to appear
+    * @return
+    */
+    private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
+        BrokerService broker = null;
+        synchronized(registry.getRegistryMutext()) {
+            broker = registry.lookup(brokerName);
+            if (broker == null && waitForStart > 0) {
+                final long expiry = System.currentTimeMillis() + waitForStart;
+                while (broker == null  && expiry > System.currentTimeMillis()) {
+                    long timeout = Math.max(0, expiry - System.currentTimeMillis());
+                    try {
+                        LOG.debug("waiting for broker named: " + brokerName + " to start");
+                        registry.getRegistryMutext().wait(timeout);
+                    } catch (InterruptedException ignored) {
+                    }
+                    broker = registry.lookup(brokerName);
+                }
+            }
+        }
+        return broker;
+    }
+
     public TransportServer doBind(URI location) throws IOException {
         return bind(location, false);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Wed Aug 20 01:07:48 2008
@@ -38,4 +38,8 @@
             session.commit();
         }
     }
+    
+    public MessageProducer getProducer() {
+        return producer;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java Wed Aug 20 01:07:48 2008
@@ -79,7 +79,7 @@
         // Give broker enough time to receive and register the consumer info
         // Either that or make consumer retroactive
         try {
-            Thread.sleep(1000);
+            Thread.sleep(2000);
         } catch (Exception e) {
             e.printStackTrace();
         }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java?rev=687280&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java Wed Aug 20 01:07:48 2008
@@ -0,0 +1,60 @@
+package org.apache.activemq.transport.vm;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class VMTransportWaitForTest extends TestCase {
+
+    private static final String VM_BROKER_URI_NO_WAIT = 
+        "vm://localhost?broker.persistent=false&create=false";
+    
+    private static final String VM_BROKER_URI_WAIT_FOR_START = 
+        VM_BROKER_URI_NO_WAIT + "&waitForStart=20000";
+    
+    CountDownLatch started = new CountDownLatch(1);
+    CountDownLatch gotConnection = new CountDownLatch(1);
+
+    public void testWaitFor() throws Exception {
+        try {
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
+            cf.createConnection();
+            fail("expect broker not exist exception");
+        } catch (JMSException expectedOnNoBrokerAndNoCreate) {
+        }
+        
+        // spawn a thread that will wait for an embedded broker to start via vm://..
+        Thread t = new Thread() {
+            public void run() {
+                    try {
+                        started.countDown();
+                        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
+                        cf.createConnection();
+                        gotConnection.countDown();
+                   
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        fail("unexpected exception:" + e);
+                    }
+            }
+        };
+        t.start();
+        started.await(20, TimeUnit.SECONDS);
+        Thread.yield();
+        assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
+        
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        assertTrue("has got connection", gotConnection.await(200, TimeUnit.MILLISECONDS));
+        broker.stop(); 
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/assembly/src/release/conf/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/conf/activemq.xml?rev=687280&r1=687279&r2=687280&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/conf/activemq.xml (original)
+++ activemq/trunk/assembly/src/release/conf/activemq.xml Wed Aug 20 01:07:48 2008
@@ -113,6 +113,10 @@
     **
     ** http://activemq.apache.org/enterprise-integration-patterns.html
     -->
+    <!-- configure the camel activemq component to use the current broker -->
+    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
+        <property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
+    </bean>
     <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
 
         <!-- You can use a <package> element for each root package to search for Java routes -->