You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/08 10:56:53 UTC

camel git commit: CAMEL-8925: Upgrade to vertx 3

Repository: camel
Updated Branches:
  refs/heads/master 044a12c21 -> 32eaf21ca


CAMEL-8925: Upgrade to vertx 3


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/32eaf21c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/32eaf21c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/32eaf21c

Branch: refs/heads/master
Commit: 32eaf21ca118d3496013cea486c7f9d3322245f7
Parents: 044a12c
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 8 11:02:57 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 8 11:02:57 2015 +0200

----------------------------------------------------------------------
 components/camel-vertx/pom.xml                  | 34 ++++++----
 .../camel/component/vertx/VertxComponent.java   | 66 +++++++++++++++-----
 .../camel/component/vertx/VertxConsumer.java    | 15 +++--
 .../camel/component/vertx/VertxEndpoint.java    |  4 +-
 .../camel/component/vertx/VertxHelper.java      |  4 +-
 .../camel/component/vertx/VertxProducer.java    | 18 ++++--
 .../component/vertx/VertxRequestReplyTest.java  |  6 --
 .../component/vertx/VertxRoutePubSubTest.java   |  6 --
 .../camel/component/vertx/VertxRouteTest.java   |  6 --
 parent/pom.xml                                  |  2 +-
 .../features/src/main/resources/features.xml    |  7 +++
 11 files changed, 107 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-vertx/pom.xml b/components/camel-vertx/pom.xml
index 9c40365..4c96037 100644
--- a/components/camel-vertx/pom.xml
+++ b/components/camel-vertx/pom.xml
@@ -15,7 +15,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
@@ -65,17 +66,24 @@
     </dependency>
   </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <forkCount>1</forkCount>
-	  <reuseForks>false</reuseForks>
-          <enableAssertions>false</enableAssertions>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
+  <!-- unit testing requires java 8 -->
+  <profiles>
+    <profile>
+      <id>jdk8-test</id>
+      <activation>
+        <jdk>!1.8</jdk>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index f7b0c61..cf4a326 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -21,28 +21,33 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import io.vertx.core.AsyncResult;
+import io.vertx.core.AsyncResultHandler;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.impl.VertxFactoryImpl;
+import io.vertx.core.spi.VertxFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ComponentConfiguration;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.EndpointCompleter;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.vertx.java.core.AsyncResult;
-import org.vertx.java.core.AsyncResultHandler;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.VertxFactory;
 
 /**
  * A Camel Component for <a href="http://vertx.io/">vert.x</a>
  */
 public class VertxComponent extends UriEndpointComponent implements EndpointCompleter {
     private static final Logger LOG = LoggerFactory.getLogger(VertxComponent.class);
+    private VertxFactory vertxFactory;
     private volatile boolean createdVertx;
     private Vertx vertx;
     private String host;
     private int port;
     private int timeout = 60;
+    private VertxOptions vertxOptions;
 
     public VertxComponent() {
         super(VertxEndpoint.class);
@@ -52,6 +57,17 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
         super(context, VertxEndpoint.class);
     }
 
+    public VertxFactory getVertxFactory() {
+        return vertxFactory;
+    }
+
+    /**
+     * To use a custom VertxFactory implementation
+     */
+    public void setVertxFactory(VertxFactory vertxFactory) {
+        this.vertxFactory = vertxFactory;
+    }
+
     public String getHost() {
         return host;
     }
@@ -74,6 +90,17 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
         this.port = port;
     }
 
+    public VertxOptions getVertxOptions() {
+        return vertxOptions;
+    }
+
+    /**
+     * Options to use for creating vertx
+     */
+    public void setVertxOptions(VertxOptions vertxOptions) {
+        this.vertxOptions = vertxOptions;
+    }
+
     public Vertx getVertx() {
         return vertx;
     }
@@ -115,16 +142,32 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
 
         if (vertx == null) {
 
+            if (vertxFactory == null) {
+                vertxFactory = new VertxFactoryImpl();
+            }
+
+            if (vertxOptions == null) {
+                vertxOptions = new VertxOptions();
+                if (ObjectHelper.isNotEmpty(host)) {
+                    vertxOptions.setClusterHost(host);
+                    vertxOptions.setClustered(true);
+                }
+                if (port > 0) {
+                    vertxOptions.setClusterPort(port);
+                    vertxOptions.setClustered(true);
+                }
+            }
+
             // we are creating vertx so we should handle its lifecycle
             createdVertx = true;
 
             final CountDownLatch latch = new CountDownLatch(1);
 
             // lets using a host / port if a host name is specified
-            if (host != null && host.length() > 0) {
-                LOG.info("Creating Clustered Vertx {}:{}", host, port);
+            if (vertxOptions.isClustered()) {
+                LOG.info("Creating Clustered Vertx {}:{}", vertxOptions.getClusterHost(), vertxOptions.getClusterPort());
                 // use the async api as we want to wait for the eventbus to be ready before we are in started state
-                VertxFactory.newVertx(port, host, new AsyncResultHandler<Vertx>() {
+                vertxFactory.clusteredVertx(vertxOptions, new AsyncResultHandler<Vertx>() {
                     @Override
                     public void handle(AsyncResult<Vertx> event) {
                         if (event.cause() != null) {
@@ -137,14 +180,9 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
                         latch.countDown();
                     }
                 });
-            } else if (host != null) {
-                LOG.info("Creating Clustered Vertx {}", host);
-                vertx = VertxFactory.newVertx(host);
-                LOG.info("EventBus is ready: {}", vertx);
-                latch.countDown();
             } else {
                 LOG.info("Creating Non-Clustered Vertx");
-                vertx = VertxFactory.newVertx();
+                vertx = vertxFactory.vertx();
                 LOG.info("EventBus is ready: {}", vertx);
                 latch.countDown();
             }
@@ -162,7 +200,7 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp
 
         if (createdVertx && vertx != null) {
             LOG.info("Stopping Vertx {}", vertx);
-            vertx.stop();
+            vertx.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index ae5ec75..18538d6 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.vertx;
 
+import io.vertx.core.Handler;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -23,16 +26,15 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.Message;
 
 import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
 
 public class VertxConsumer extends DefaultConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(VertxConsumer.class);
     private final VertxEndpoint endpoint;
+    private transient MessageConsumer messageConsumer;
 
-    private Handler<? extends Message> handler = new Handler<Message>() {
+    private Handler<Message<Object>> handler = new Handler<Message<Object>>() {
         public void handle(Message event) {
             onEventBusEvent(event);
         }
@@ -74,7 +76,7 @@ public class VertxConsumer extends DefaultConsumer {
         }
 
         if (endpoint.getEventBus() != null) {
-            endpoint.getEventBus().registerHandler(endpoint.getAddress(), handler);
+            messageConsumer = endpoint.getEventBus().consumer(endpoint.getAddress(), handler);
         }
         super.doStart();
     }
@@ -85,8 +87,9 @@ public class VertxConsumer extends DefaultConsumer {
         }
 
         try {
-            if (endpoint.getEventBus() != null) {
-                endpoint.getEventBus().unregisterHandler(endpoint.getAddress(), handler);
+            if (messageConsumer != null && messageConsumer.isRegistered()) {
+                messageConsumer.unregister();
+                messageConsumer = null;
             }
         } catch (IllegalStateException e) {
             LOG.warn("EventBus already stopped on address {}", endpoint.getAddress());

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
index b116107..e1fef5c 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.vertx;
 
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -24,8 +26,6 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
-import org.vertx.java.core.Vertx;
-import org.vertx.java.core.eventbus.EventBus;
 
 /**
  * A Camel Endpoint for working with <a href="http://vertx.io/">vert.x</a> event bus endpoints

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
index 661c86d..04b42ca 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.component.vertx;
 
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
 
 public final class VertxHelper {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
index c7b9abc..ee192e4 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.vertx;
 
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadRuntimeException;
@@ -24,8 +28,6 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.EventBus;
 
 import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
 
@@ -80,7 +82,7 @@ public class VertxProducer extends DefaultAsyncProducer {
         return true;
     }
 
-    private static final class CamelReplyHandler implements Handler<org.vertx.java.core.eventbus.Message> {
+    private static final class CamelReplyHandler implements Handler<AsyncResult<Message<Object>>> {
 
         private final Exchange exchange;
         private final AsyncCallback callback;
@@ -91,14 +93,20 @@ public class VertxProducer extends DefaultAsyncProducer {
         }
 
         @Override
-        public void handle(org.vertx.java.core.eventbus.Message event) {
+        public void handle(AsyncResult<Message<Object>> event) {
             try {
                 // preserve headers
                 MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
-                exchange.getOut().setBody(event.body());
+                Throwable e = event.cause();
+                if (e != null) {
+                    exchange.setException(e);
+                } else {
+                    exchange.getOut().setBody(event.result().body());
+                }
             } finally {
                 callback.done(false);
             }
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
index 1000d01..c6c5837 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
@@ -50,12 +50,6 @@ public class VertxRequestReplyTest extends VertxBaseTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // camel-vertx cannot be ran with JDK 1.6
-                org.junit.Assume.assumeTrue(!isJava16());
-
-                VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
-                vertx.setPort(getPort());
-
                 from(startUri).to(middleUri).to(resultUri);
 
                 from(middleUri)

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
index d185f51..cf8e02e 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
@@ -55,12 +55,6 @@ public class VertxRoutePubSubTest extends VertxBaseTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // camel-vertx cannot be ran with JDK 1.6
-                org.junit.Assume.assumeTrue(!isJava16());
-
-                VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
-                vertx.setPort(getPort());
-
                 from(startUri).to(middleUri);
                 from(middleUri).to(resultUri);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
index 1d3485a..1a331ab 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRouteTest.java
@@ -55,12 +55,6 @@ public class VertxRouteTest extends VertxBaseTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // camel-vertx cannot be ran with JDK 1.6
-                org.junit.Assume.assumeTrue(!isJava16());
-
-                VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
-                vertx.setPort(getPort());
-
                 from(startUri).to(middleUri);
                 from(middleUri).to(resultUri);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index daac6fe..58b10be 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -492,7 +492,7 @@
     <velocity-bundle-version>1.7_6</velocity-bundle-version>
     <velocity-tools-version>2.0</velocity-tools-version>
     <velocity-version>1.7</velocity-version>
-    <vertx-version>2.1.6</vertx-version>
+    <vertx-version>3.0.0</vertx-version>
     <vysper-version>0.7</vysper-version>
     <weld2-version>2.2.14.Final</weld2-version>
     <werken-xpath-bundle-version>0.9.4_5</werken-xpath-bundle-version>

http://git-wip-us.apache.org/repos/asf/camel/blob/32eaf21c/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 1990e2e..44040d7 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1549,9 +1549,16 @@
     <bundle>mvn:org.apache.camel/camel-urlrewrite/${project.version}</bundle>
   </feature>
   <feature name='camel-vertx' version='${project.version}' resolver='(obr)' start-level='50'>
+    <details>camel-vertx requires Java 1.8 or better</details>
     <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-core/${jackson2-version}</bundle>
     <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-databind/${jackson2-version}</bundle>
     <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-annotations/${jackson2-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-common/${netty-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-transport/${netty-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-buffer/${netty-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-handler/${netty-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-codec/${netty-version}</bundle>
+    <bundle dependency='true'>mvn:io.netty/netty-codec-http/${netty-version}</bundle>
     <bundle dependency='true'>mvn:com.hazelcast/hazelcast/${hazelcast-version}</bundle>
     <bundle dependency='true'>mvn:com.eclipsesource.minimal-json/minimal-json/${minimal-json-version}</bundle>
     <!-- TODO: vertx is not OSGi bundle, see https://github.com/eclipse/vert.x/issues/708 -->