You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/08 10:56:53 UTC
camel git commit: CAMEL-8925: Upgrade to vertx 3
Repository: camel
Updated Branches:
refs/heads/master 044a12c21 -> 32eaf21ca
CAMEL-8925: Upgrade to vertx 3
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/32eaf21c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/32eaf21c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/32eaf21c
Branch: refs/heads/master
Commit: 32eaf21ca118d3496013cea486c7f9d3322245f7
Parents: 044a12c
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 8 11:02:57 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 8 11:02:57 2015 +0200
----------------------------------------------------------------------
components/camel-vertx/pom.xml | 34 ++++++----
.../camel/component/vertx/VertxComponent.java | 66 +++++++++++++++-----
.../camel/component/vertx/VertxConsumer.java | 15 +++--
.../camel/component/vertx/VertxEndpoint.java | 4 +-
.../camel/component/vertx/VertxHelper.java | 4 +-
.../camel/component/vertx/VertxProducer.java | 18 ++++--
.../component/vertx/VertxRequestReplyTest.java | 6 --
.../component/vertx/VertxRoutePubSubTest.java | 6 --
.../camel/component/vertx/VertxRouteTest.java | 6 --
parent/pom.xml | 2 +-
.../features/src/main/resources/features.xml | 7 +++
11 files changed, 107 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-vertx/pom.xml b/components/camel-vertx/pom.xml
index 9c40365..4c96037 100644
--- a/components/camel-vertx/pom.xml
+++ b/components/camel-vertx/pom.xml
@@ -15,7 +15,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -65,17 +66,24 @@
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- <enableAssertions>false</enableAssertions>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <!-- unit testing requires java 8 -->
+ <profiles>
+ <profile>
+ <id>jdk8-test</id>
+ <activation>
+ <jdk>!1.8</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index f7b0c61..cf4a326 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -21,28 +21,33 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.AsyncResultHandler;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.impl.VertxFactoryImpl;
+import io.vertx.core.spi.VertxFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ComponentConfiguration;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.spi.EndpointCompleter;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.vertx.java.core.AsyncResult;
-import org.vertx.java.core.AsyncResultHandler;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.VertxFactory;
/**
* A Camel Component for <a href="http://vertx.io/">vert.x</a>
*/
public class VertxComponent extends UriEndpointComponent implements EndpointCompleter {
private static final Logger LOG = LoggerFactory.getLogger(VertxComponent.class);
+ private VertxFactory vertxFactory;
private volatile boolean createdVertx;
private Vertx vertx;
private String host;
private int port;
private int timeout = 60;
+ private VertxOptions vertxOptions;
public VertxComponent() {
super(VertxEndpoint.class);
@@ -52,6 +57,17 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
super(context, VertxEndpoint.class);
}
+ public VertxFactory getVertxFactory() {
+ return vertxFactory;
+ }
+
+ /**
+ * To use a custom VertxFactory implementation
+ */
+ public void setVertxFactory(VertxFactory vertxFactory) {
+ this.vertxFactory = vertxFactory;
+ }
+
public String getHost() {
return host;
}
@@ -74,6 +90,17 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
this.port = port;
}
+ public VertxOptions getVertxOptions() {
+ return vertxOptions;
+ }
+
+ /**
+ * Options to use for creating vertx
+ */
+ public void setVertxOptions(VertxOptions vertxOptions) {
+ this.vertxOptions = vertxOptions;
+ }
+
public Vertx getVertx() {
return vertx;
}
@@ -115,16 +142,32 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
if (vertx == null) {
+ if (vertxFactory == null) {
+ vertxFactory = new VertxFactoryImpl();
+ }
+
+ if (vertxOptions == null) {
+ vertxOptions = new VertxOptions();
+ if (ObjectHelper.isNotEmpty(host)) {
+ vertxOptions.setClusterHost(host);
+ vertxOptions.setClustered(true);
+ }
+ if (port > 0) {
+ vertxOptions.setClusterPort(port);
+ vertxOptions.setClustered(true);
+ }
+ }
+
// we are creating vertx so we should handle its lifecycle
createdVertx = true;
final CountDownLatch latch = new CountDownLatch(1);
// lets using a host / port if a host name is specified
- if (host != null && host.length() > 0) {
- LOG.info("Creating Clustered Vertx {}:{}", host, port);
+ if (vertxOptions.isClustered()) {
+ LOG.info("Creating Clustered Vertx {}:{}", vertxOptions.getClusterHost(), vertxOptions.getClusterPort());
// use the async api as we want to wait for the eventbus to be ready before we are in started state
- VertxFactory.newVertx(port, host, new AsyncResultHandler<Vertx>() {
+ vertxFactory.clusteredVertx(vertxOptions, new AsyncResultHandler<Vertx>() {
@Override
public void handle(AsyncResult<Vertx> event) {
if (event.cause() != null) {
@@ -137,14 +180,9 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
latch.countDown();
}
});
- } else if (host != null) {
- LOG.info("Creating Clustered Vertx {}", host);
- vertx = VertxFactory.newVertx(host);
- LOG.info("EventBus is ready: {}", vertx);
- latch.countDown();
} else {
LOG.info("Creating Non-Clustered Vertx");
- vertx = VertxFactory.newVertx();
+ vertx = vertxFactory.vertx();
LOG.info("EventBus is ready: {}", vertx);
latch.countDown();
}
@@ -162,7 +200,7 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
if (createdVertx && vertx != null) {
LOG.info("Stopping Vertx {}", vertx);
- vertx.stop();
+ vertx.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index ae5ec75..18538d6 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.vertx;
+import io.vertx.core.Handler;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -23,16 +26,15 @@ import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.Message;
import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
public class VertxConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(VertxConsumer.class);
private final VertxEndpoint endpoint;
+ private transient MessageConsumer messageConsumer;
- private Handler<? extends Message> handler = new Handler<Message>() {
+ private Handler<Message<Object>> handler = new Handler<Message<Object>>() {
public void handle(Message event) {
onEventBusEvent(event);
}
@@ -74,7 +76,7 @@ public class VertxConsumer extends DefaultConsumer {
}
if (endpoint.getEventBus() != null) {
- endpoint.getEventBus().registerHandler(endpoint.getAddress(), handler);
+ messageConsumer = endpoint.getEventBus().consumer(endpoint.getAddress(), handler);
}
super.doStart();
}
@@ -85,8 +87,9 @@ public class VertxConsumer extends DefaultConsumer {
}
try {
- if (endpoint.getEventBus() != null) {
- endpoint.getEventBus().unregisterHandler(endpoint.getAddress(), handler);
+ if (messageConsumer != null && messageConsumer.isRegistered()) {
+ messageConsumer.unregister();
+ messageConsumer = null;
}
} catch (IllegalStateException e) {
LOG.warn("EventBus already stopped on address {}", endpoint.getAddress());
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
index b116107..e1fef5c 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.vertx;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -24,8 +26,6 @@ import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.eventbus.EventBus;
/**
* A Camel Endpoint for working with <a href="http://vertx.io/">vert.x</a> event bus endpoints
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
index 661c86d..04b42ca 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
@@ -16,10 +16,10 @@
*/
package org.apache.camel.component.vertx;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
public final class VertxHelper {
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
index c7b9abc..ee192e4 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.vertx;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadRuntimeException;
@@ -24,8 +28,6 @@ import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.EventBus;
import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
@@ -80,7 +82,7 @@ public class VertxProducer extends DefaultAsyncProducer {
return true;
}
- private static final class CamelReplyHandler implements Handler<org.vertx.java.core.eventbus.Message> {
+ private static final class CamelReplyHandler implements Handler<AsyncResult<Message<Object>>> {
private final Exchange exchange;
private final AsyncCallback callback;
@@ -91,14 +93,20 @@ public class VertxProducer extends DefaultAsyncProducer {
}
@Override
- public void handle(org.vertx.java.core.eventbus.Message event) {
+ public void handle(AsyncResult<Message<Object>> event) {
try {
// preserve headers
MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
- exchange.getOut().setBody(event.body());
+ Throwable e = event.cause();
+ if (e != null) {
+ exchange.setException(e);
+ } else {
+ exchange.getOut().setBody(event.result().body());
+ }
} finally {
callback.done(false);
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
index 1000d01..c6c5837 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
@@ -50,12 +50,6 @@ public class VertxRequestReplyTest extends VertxBaseTestSupport {
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // camel-vertx cannot be ran with JDK 1.6
- org.junit.Assume.assumeTrue(!isJava16());
-
- VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
- vertx.setPort(getPort());
-
from(startUri).to(middleUri).to(resultUri);
from(middleUri)
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
index d185f51..cf8e02e 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
@@ -55,12 +55,6 @@ public class VertxRoutePubSubTest extends VertxBaseTestSupport {
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // camel-vertx cannot be ran with JDK 1.6
- org.junit.Assume.assumeTrue(!isJava16());
-
- VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
- vertx.setPort(getPort());
-
from(startUri).to(middleUri);
from(middleUri).to(resultUri);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
index 1d3485a..1a331ab 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
@@ -55,12 +55,6 @@ public class VertxRouteTest extends VertxBaseTestSupport {
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // camel-vertx cannot be ran with JDK 1.6
- org.junit.Assume.assumeTrue(!isJava16());
-
- VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
- vertx.setPort(getPort());
-
from(startUri).to(middleUri);
from(middleUri).to(resultUri);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index daac6fe..58b10be 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -492,7 +492,7 @@
<velocity-bundle-version>1.7_6</velocity-bundle-version>
<velocity-tools-version>2.0</velocity-tools-version>
<velocity-version>1.7</velocity-version>
- <vertx-version>2.1.6</vertx-version>
+ <vertx-version>3.0.0</vertx-version>
<vysper-version>0.7</vysper-version>
<weld2-version>2.2.14.Final</weld2-version>
<werken-xpath-bundle-version>0.9.4_5</werken-xpath-bundle-version>
http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 1990e2e..44040d7 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1549,9 +1549,16 @@
<bundle>mvn:org.apache.camel/camel-urlrewrite/${project.version}</bundle>
</feature>
<feature name='camel-vertx' version='${project.version}' resolver='(obr)' start-level='50'>
+ <details>camel-vertx requires Java 1.8 or better</details>
<bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-core/${jackson2-version}</bundle>
<bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-databind/${jackson2-version}</bundle>
<bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-annotations/${jackson2-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-common/${netty-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-transport/${netty-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-buffer/${netty-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-handler/${netty-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-codec/${netty-version}</bundle>
+ <bundle dependency='true'>mvn:io.netty/netty-codec-http/${netty-version}</bundle>
<bundle dependency='true'>mvn:com.hazelcast/hazelcast/${hazelcast-version}</bundle>
<bundle dependency='true'>mvn:com.eclipsesource.minimal-json/minimal-json/${minimal-json-version}</bundle>
<!-- TODO: vertx is not OSGi bundle, see https://github.com/eclipse/vert.x/issues/708 -->