You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/11 15:54:08 UTC
camel git commit: CAMEL-8617: camel-vertx - Add timeout during
startup to wait for EventBus to be ready
Repository: camel
Updated Branches:
refs/heads/master ced1574db -> 7e4b0b67d
CAMEL-8617: camel-vertx - Add timeout during startup to wait for EventBus to be ready
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7e4b0b67
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7e4b0b67
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7e4b0b67
Branch: refs/heads/master
Commit: 7e4b0b67d26cd8a57ff8f81850a33ef808b721d3
Parents: ced1574
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 11 15:55:36 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 11 15:55:36 2015 +0200
----------------------------------------------------------------------
.../camel/component/vertx/VertxComponent.java | 51 ++++++++++++++++++--
1 file changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7e4b0b67/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index 4074b2b..0e82c52 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.vertx;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.ComponentConfiguration;
@@ -26,6 +28,8 @@ import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.spi.EndpointCompleter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;
@@ -37,6 +41,7 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
private Vertx vertx;
private String host;
private int port;
+ private int timeout = 60;
public VertxComponent() {
super(VertxEndpoint.class);
@@ -70,6 +75,19 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
this.vertx = vertx;
}
+ public int getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Timeout in seconds to wait for clustered Vertx EventBus to be ready.
+ * <p/>
+ * The default value is 60.
+ */
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
VertxEndpoint endpoint = new VertxEndpoint(uri, this, remaining);
setProperties(endpoint, parameters);
@@ -85,17 +103,42 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
protected void doStart() throws Exception {
super.doStart();
+
if (vertx == null) {
+ final CountDownLatch latch = new CountDownLatch(1);
+
// lets using a host / port if a host name is specified
if (host != null && host.length() > 0) {
LOG.info("Creating Clustered Vertx {}:{}", host, port);
- vertx = VertxFactory.newVertx(port, host);
+ // use the async api as we want to wait for the eventbus to be ready before we are in started state
+ VertxFactory.newVertx(port, host, new AsyncResultHandler<Vertx>() {
+ @Override
+ public void handle(AsyncResult<Vertx> event) {
+ if (event.cause() != null) {
+ LOG.warn("Error creating Clustered Vertx " + host + ":" + port + " due " + event.cause().getMessage(), event.cause());
+ } else if (event.succeeded()) {
+ vertx = event.result();
+ LOG.info("EventBus is ready: {}", vertx);
+ }
+
+ latch.countDown();
+ }
+ });
} else if (host != null) {
LOG.info("Creating Clustered Vertx {}", host);
vertx = VertxFactory.newVertx(host);
+ LOG.info("EventBus is ready: {}", vertx);
+ latch.countDown();
} else {
LOG.info("Creating Non-Clustered Vertx");
vertx = VertxFactory.newVertx();
+ LOG.info("EventBus is ready: {}", vertx);
+ latch.countDown();
+ }
+
+ if (latch.getCount() > 0) {
+ LOG.info("Waiting for EventBus to be ready using {} sec as timeout", timeout);
+ latch.await(timeout, TimeUnit.SECONDS);
}
}
}
@@ -104,7 +147,9 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
protected void doStop() throws Exception {
super.doStop();
- LOG.info("Stopping Vertx {}", vertx);
- vertx.stop();
+ if (vertx != null) {
+ LOG.info("Stopping Vertx {}", vertx);
+ vertx.stop();
+ }
}
}