You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/11 15:54:08 UTC

camel git commit: CAMEL-8617: camel-vertx - Add timeout during startup to wait for EventBus to be ready

Repository: camel
Updated Branches:
  refs/heads/master ced1574db -> 7e4b0b67d


CAMEL-8617: camel-vertx - Add timeout during startup to wait for EventBus to be ready


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7e4b0b67
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7e4b0b67
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7e4b0b67

Branch: refs/heads/master
Commit: 7e4b0b67d26cd8a57ff8f81850a33ef808b721d3
Parents: ced1574
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 11 15:55:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 11 15:55:36 2015 +0200

----------------------------------------------------------------------
 .../camel/component/vertx/VertxComponent.java   | 51 ++++++++++++++++++--
 1 file changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7e4b0b67/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index 4074b2b..0e82c52 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.vertx;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ComponentConfiguration;
@@ -26,6 +28,8 @@ import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.EndpointCompleter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
 import org.vertx.java.core.Vertx;
 import org.vertx.java.core.VertxFactory;
 
@@ -37,6 +41,7 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
     private Vertx vertx;
     private String host;
     private int port;
+    private int timeout = 60;
 
     public VertxComponent() {
         super(VertxEndpoint.class);
@@ -70,6 +75,19 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
         this.vertx = vertx;
     }
 
+    public int getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Timeout in seconds to wait for clustered Vertx EventBus to be ready.
+     * <p/>
+     * The default value is 60.
+     */
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         VertxEndpoint endpoint = new VertxEndpoint(uri, this, remaining);
         setProperties(endpoint, parameters);
@@ -85,17 +103,42 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
     protected void doStart() throws Exception {
         super.doStart();
 
+
         if (vertx == null) {
+            final CountDownLatch latch = new CountDownLatch(1);
+
             // lets using a host / port if a host name is specified
             if (host != null && host.length() > 0) {
                 LOG.info("Creating Clustered Vertx {}:{}", host, port);
-                vertx = VertxFactory.newVertx(port, host);
+                // use the async api as we want to wait for the eventbus to be ready before we are in started state
+                VertxFactory.newVertx(port, host, new AsyncResultHandler<Vertx>() {
+                    @Override
+                    public void handle(AsyncResult<Vertx> event) {
+                        if (event.cause() != null) {
+                            LOG.warn("Error creating Clustered Vertx " + host + ":" + port + " due " + event.cause().getMessage(), event.cause());
+                        } else if (event.succeeded()) {
+                            vertx = event.result();
+                            LOG.info("EventBus is ready: {}", vertx);
+                        }
+
+                        latch.countDown();
+                    }
+                });
             } else if (host != null) {
                 LOG.info("Creating Clustered Vertx {}", host);
                 vertx = VertxFactory.newVertx(host);
+                LOG.info("EventBus is ready: {}", vertx);
+                latch.countDown();
             } else {
                 LOG.info("Creating Non-Clustered Vertx");
                 vertx = VertxFactory.newVertx();
+                LOG.info("EventBus is ready: {}", vertx);
+                latch.countDown();
+            }
+
+            if (latch.getCount() > 0) {
+                LOG.info("Waiting for EventBus to be ready using {} sec as timeout", timeout);
+                latch.await(timeout, TimeUnit.SECONDS);
             }
         }
     }
@@ -104,7 +147,9 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
     protected void doStop() throws Exception {
         super.doStop();
 
-        LOG.info("Stopping Vertx {}", vertx);
-        vertx.stop();
+        if (vertx != null) {
+            LOG.info("Stopping Vertx {}", vertx);
+            vertx.stop();
+        }
     }
 }