You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/06/09 21:40:21 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5086

Repository: activemq
Updated Branches:
  refs/heads/trunk 77713d9d1 -> 5016c4d4f


https://issues.apache.org/jira/browse/AMQ-5086

Ensure that wait for started on vm transport factory actually waits for
start, currently it doesn't really check started or wait porperly. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5016c4d4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5016c4d4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5016c4d4

Branch: refs/heads/trunk
Commit: 5016c4d4f2fb2a62d8f38732be0f14d96426d921
Parents: 77713d9
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jun 9 15:40:03 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jun 9 15:40:03 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   | 15 +++++-
 .../transport/vm/VMTransportFactory.java        | 38 +++++++++----
 .../transport/vm/VMTransportWaitForTest.java    | 57 +++++++++++++++++++-
 3 files changed, 99 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 6ecd427..235ad5d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -921,8 +921,21 @@ public class BrokerService implements Service {
      * @return boolean true if wait succeeded false if broker was not started or was stopped
      */
     public boolean waitUntilStarted() {
+        return waitUntilStarted(Long.MAX_VALUE);
+    }
+
+    /**
+     * A helper method to block the caller thread until the broker has fully started
+     *
+     * @param timeout
+     *        the amount of time to wait before giving up and returning false.
+     *
+     * @return boolean true if wait succeeded false if broker was not started or was stopped
+     */
+    public boolean waitUntilStarted(long timeout) {
         boolean waitSucceeded = isStarted();
-        while (!isStarted() && !stopped.get() && !waitSucceeded) {
+        long expiration = Math.max(0, timeout + System.currentTimeMillis());
+        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
             try {
                 if (startException != null) {
                     return waitSucceeded;

http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
index e88faaf..7bd24cf 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
@@ -42,18 +42,20 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 public class VMTransportFactory extends TransportFactory {
-    
+
     public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
     public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
     public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
     private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
-    
+
     BrokerFactoryHandler brokerFactoryHandler;
 
+    @Override
     public Transport doConnect(URI location) throws Exception {
         return VMTransportServer.configure(doCompositeConnect(location));
     }
 
+    @Override
     public Transport doCompositeConnect(URI location) throws Exception {
         URI brokerURI;
         String host;
@@ -64,7 +66,7 @@ public class VMTransportFactory extends TransportFactory {
         if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
             brokerURI = data.getComponents()[0];
             CompositeData brokerData = URISupport.parseComposite(brokerURI);
-            host = (String)brokerData.getParameters().get("brokerName");
+            host = brokerData.getParameters().get("brokerName");
             if (host == null) {
                 host = "localhost";
             }
@@ -79,7 +81,7 @@ public class VMTransportFactory extends TransportFactory {
             try {
                 host = extractHost(location);
                 options = URISupport.parseParameters(location);
-                String config = (String)options.remove("brokerConfig");
+                String config = options.remove("brokerConfig");
                 if (config != null) {
                     brokerURI = new URI(config);
                 } else {
@@ -170,32 +172,50 @@ public class VMTransportFactory extends TransportFactory {
        return host;
     }
 
-/**
+   /**
+    * Attempt to find a Broker instance.
+    *
     * @param registry
+    *        the registry in which to search for the BrokerService instance.
     * @param brokerName
-    * @param waitForStart - time in milliseconds to wait for a broker to appear
-    * @return
+    *        the name of the Broker that should be located.
+    * @param waitForStart
+    *        time in milliseconds to wait for a broker to appear and be started.
+    *
+    * @return a BrokerService instance if one is found, or null.
     */
     private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
         BrokerService broker = null;
         synchronized(registry.getRegistryMutext()) {
             broker = registry.lookup(brokerName);
-            if (broker == null && waitForStart > 0) {
+            if (broker == null || waitForStart > 0) {
                 final long expiry = System.currentTimeMillis() + waitForStart;
                 while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) {
                     long timeout = Math.max(0, expiry - System.currentTimeMillis());
                     try {
-                        LOG.debug("waiting for broker named: " + brokerName + " to start");
+                        LOG.debug("waiting for broker named: " + brokerName + " to enter registry");
                         registry.getRegistryMutext().wait(timeout);
                     } catch (InterruptedException ignored) {
                     }
                     broker = registry.lookup(brokerName);
+                    if (broker != null && !broker.isStarted()) {
+                        LOG.debug("waiting for broker named: " + brokerName + " to start");
+                        timeout = Math.max(0, expiry - System.currentTimeMillis());
+                        // Wait for however long we have left for broker to be started, if
+                        // it doesn't get started we need to clear broker so it doesn't get
+                        // returned.  A null return should throw an exception.
+                        if (!broker.waitUntilStarted(timeout)) {
+                            broker = null;
+                            break;
+                        }
+                    }
                 }
             }
         }
         return broker;
     }
 
+    @Override
     public TransportServer doBind(URI location) throws IOException {
         return bind(location, false);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
index faa93e4..e498936 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,20 +28,34 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.JMSException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 public class VMTransportWaitForTest {
 
+    private static final int WAIT_TIME = 20000;
+    private static final int SHORT_WAIT_TIME = 5000;
+
     private static final String VM_BROKER_URI_NO_WAIT =
         "vm://localhost?broker.persistent=false&create=false";
 
     private static final String VM_BROKER_URI_WAIT_FOR_START =
-        VM_BROKER_URI_NO_WAIT + "&waitForStart=20000";
+        VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
+
+    private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START =
+        VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
 
     CountDownLatch started = new CountDownLatch(1);
     CountDownLatch gotConnection = new CountDownLatch(1);
 
+    @After
+    public void after() throws IOException {
+        BrokerRegistry.getInstance().unbind("localhost");
+    }
+
     @Test(timeout=90000)
     public void testWaitFor() throws Exception {
         try {
@@ -77,4 +92,44 @@ public class VMTransportWaitForTest {
         assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS));
         broker.stop();
     }
+
+    @Test(timeout=90000)
+    public void testWaitForNoBrokerInRegistry() throws Exception {
+
+        long startTime = System.currentTimeMillis();
+
+        try {
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
+            cf.createConnection();
+            fail("expect broker not exist exception");
+        } catch (JMSException expectedOnNoBrokerAndNoCreate) {
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        Log.info("Total wait time was: {}", endTime - startTime);
+        assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
+    }
+
+    @Test(timeout=90000)
+    public void testWaitForNotStartedButInRegistry() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        BrokerRegistry.getInstance().bind("localhost", broker);
+
+        long startTime = System.currentTimeMillis();
+
+        try {
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
+            cf.createConnection();
+            fail("expect broker not exist exception");
+        } catch (JMSException expectedOnNoBrokerAndNoCreate) {
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        Log.info("Total wait time was: {}", endTime - startTime);
+        assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
+    }
 }