You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/07/13 12:46:50 UTC

[1/7] camel git commit: Camel experiment with using vertx as eventbus for routing engine.

Repository: camel
Updated Branches:
  refs/heads/eventbus [created] 9aafe35f3


Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48219df4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48219df4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48219df4

Branch: refs/heads/eventbus
Commit: 48219df43b5c81181d687456083ede593c4a6cec
Parents: dcdbf48
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 8 10:00:13 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 8 10:00:13 2016 +0200

----------------------------------------------------------------------
 .../vertx/eventbus/VertxCamelProducer.java      | 65 +++++++++++++++++++
 .../vertx/eventbus/VertxExchangeCodec.java      | 51 +++++++++++++++
 .../vertx/eventbus/VertxProcessorFactory.java   | 50 +++++++++++++++
 .../vertx/eventbus/VertxSendToProcessor.java    | 45 +++++++++++++
 .../vertx/eventbus/VertxSendToTest.java         | 67 ++++++++++++++++++++
 .../src/test/resources/log4j.properties         |  2 +-
 6 files changed, 279 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
new file mode 100644
index 0000000..923ca38
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
+
+public class VertxCamelProducer extends ServiceSupport implements Handler<Message<Exchange>> {
+
+    private final CamelContext camelContext;
+    private final ProducerTemplate template;
+    private final Vertx vertx;
+    private final String id;
+    private MessageConsumer<Exchange> consumer;
+
+    public VertxCamelProducer(CamelContext camelContext, Vertx vertx, String id) {
+        this.camelContext = camelContext;
+        this.template = camelContext.createProducerTemplate();
+        this.vertx = vertx;
+        this.id = id;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        consumer = vertx.eventBus().localConsumer(id, this);
+        ServiceHelper.startService(template);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (consumer != null) {
+            consumer.unregister();
+        }
+        ServiceHelper.stopService(template);
+    }
+
+    @Override
+    public void handle(Message<Exchange> event) {
+        Exchange exchange = event.body();
+        String url = (String) exchange.removeProperty("CamelVertxUrl");
+        // TODO: execute blocking
+        template.send(url, exchange);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
new file mode 100644
index 0000000..ac7a33f
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.MessageCodec;
+import org.apache.camel.Exchange;
+
+public class VertxExchangeCodec implements MessageCodec<Exchange, Exchange> {
+
+    @Override
+    public void encodeToWire(Buffer buffer, Exchange exchange) {
+        // noop
+        System.out.println("xxx");
+    }
+
+    @Override
+    public Exchange decodeFromWire(int pos, Buffer buffer) {
+        System.out.println("yyy");
+        return null;
+    }
+
+    @Override
+    public Exchange transform(Exchange exchange) {
+        return exchange;
+    }
+
+    @Override
+    public String name() {
+        return "camel";
+    }
+
+    @Override
+    public byte systemCodecID() {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
new file mode 100644
index 0000000..d335725
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ToDefinition;
+import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.RouteContext;
+
+public class VertxProcessorFactory implements ProcessorFactory {
+
+    private final Vertx vertx;
+
+    public VertxProcessorFactory(Vertx vertx) {
+        this.vertx = vertx;
+    }
+
+    @Override
+    public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> def, boolean mandatory) throws Exception {
+        return null;
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> def) throws Exception {
+        String id = def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+
+        if (def instanceof ToDefinition) {
+            String uri = ((ToDefinition) def).getEndpointUri();
+            return new VertxSendToProcessor(vertx, id, uri);
+        }
+
+        throw new UnsupportedOperationException("EIP not supported yet");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
new file mode 100644
index 0000000..139a289
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class VertxSendToProcessor implements Processor {
+
+    private final Vertx vertx;
+    private final String id;
+    private final String uri;
+    private final DeliveryOptions options;
+
+    public VertxSendToProcessor(Vertx vertx, String id, String uri) {
+        this.vertx = vertx;
+        this.id = id;
+        this.uri = uri;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        // if OUT then use reply handler to update exchange with result
+        exchange.setProperty("CamelVertxUrl", uri);
+        vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
new file mode 100644
index 0000000..b6c7ded
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vertx.VertxBaseTestSupport;
+import org.junit.Test;
+
+public class VertxSendToTest extends VertxBaseTestSupport {
+
+    private Vertx vertx;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        vertx = Vertx.vertx();
+        vertx.eventBus().registerCodec(new VertxExchangeCodec());
+
+        VertxProcessorFactory pf = new VertxProcessorFactory(vertx);
+        context.setProcessorFactory(pf);
+
+        VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName());
+        context.addService(vcp);
+
+        return context;
+    }
+
+    @Test
+    public void testVertxSendTo() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:foo")
+                    .to("mock:bar");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/resources/log4j.properties b/components/camel-vertx/src/test/resources/log4j.properties
index e169468..57a694f 100644
--- a/components/camel-vertx/src/test/resources/log4j.properties
+++ b/components/camel-vertx/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 # The logging properties used for testing
 #
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, out
 
 log4j.logger.org.apache.camel.component.vertx=DEBUG
 #log4j.logger.org.apache.camel=DEBUG


[3/7] camel git commit: Camel experiment with using vertx as eventbus for routing engine.

Posted by da...@apache.org.
Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45050b11
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45050b11
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45050b11

Branch: refs/heads/eventbus
Commit: 45050b110b1c45e6aa374e865d3233b12d8e36a7
Parents: 3e3e180
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 8 13:28:39 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 8 13:28:39 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/vertx/eventbus/VertxCamelProducer.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/45050b11/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
index 7b411c9..e77b8b6 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
@@ -63,7 +63,8 @@ public class VertxCamelProducer extends ServiceSupport implements Handler<Messag
     public void handle(Message<Exchange> event) {
         Exchange exchange = event.body();
         String url = (String) exchange.removeProperty("CamelVertxUrl");
-        // TODO: execute blocking
+        // ideally we should know if a endpoint support async procesing by nature or not
+        // then we can call the blocking vs non-blocking here in vert.x
         template.send(url, exchange);
         // signal we are done
         event.reply(exchange, options);


[5/7] camel git commit: Fixed doc

Posted by da...@apache.org.
Fixed doc


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ad690ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ad690ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ad690ad

Branch: refs/heads/eventbus
Commit: 2ad690ad0f8e27abb80307daaf0be6af043d5520
Parents: 0479f3b
Author: Claus Ibsen <da...@apache.org>
Authored: Thu May 5 14:51:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu May 5 14:51:03 2016 +0200

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-s3.adoc | 40 +++------------------
 1 file changed, 4 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2ad690ad/components/camel-aws/src/main/docs/aws-s3.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc b/components/camel-aws/src/main/docs/aws-s3.adoc
index 59c10e9..d1be3af 100644
--- a/components/camel-aws/src/main/docs/aws-s3.adoc
+++ b/components/camel-aws/src/main/docs/aws-s3.adoc
@@ -86,8 +86,6 @@ The AWS S3 Storage Service component supports 38 endpoint options which are list
 
 
 
-|=======================================================================
-
 Required S3 component options
 
 You have to provide the amazonS3Client in the
@@ -115,36 +113,19 @@ Message headers evaluated by the S3 producer
 [width="100%",cols="10%,10%,80%",options="header",]
 |=======================================================================
 |Header |Type |Description
-
 |`CamelAwsS3Key` |`String` |The key under which this object will be stored.
-
 |`CamelAwsS3ContentLength` |`Long` |The content length of this object.
-
 |`CamelAwsS3ContentType` |`String` |The content type of this object.
-
 |`CamelAwsS3ContentControl` |`String` |*Camel 2.8.2:* The content control of this object.
-
 |`CamelAwsS3ContentDisposition` |`String` |*Camel 2.8.2:* The content disposition of this object.
-
 |`CamelAwsS3ContentEncoding` |`String` |*Camel 2.8.2:* The content encoding of this object.
-
 |`CamelAwsS3ContentMD5` |`String` |*Camel 2.8.2:* The md5 checksum of this object.
-
 |`CamelAwsS3LastModified` |`java.util.Date` |*Camel 2.8.2:* The last modified timestamp of this object.
-
 |`CamelAwsS3StorageClass` |`String` |*Camel 2.8.4:* The storage class of this object.
-
-|`CamelAwsS3CannedAcl` |`String` |*Camel 2.11.0:* The canned acl that will be applied to the object. see
-`com.amazonaws.services.s3.model.CannedAccessControlList` for allowed
-values.
-
-|`CamelAwsS3Acl` |`com.amazonaws.services.s3.model.AccessControlList` |*Camel 2.11.0:* a well constructed Amazon S3 Access Control List object.
-see `com.amazonaws.services.s3.model.AccessControlList` for more details
-
+|`CamelAwsS3CannedAcl` |`String` |*Camel 2.11.0:* The canned acl that will be applied to the object. see `com.amazonaws.services.s3.model.CannedAccessControlList` for allowed values.
+|`CamelAwsS3Acl` |`com.amazonaws.services.s3.model.AccessControlList` |*Camel 2.11.0:* a well constructed Amazon S3 Access Control List object. see `com.amazonaws.services.s3.model.AccessControlList` for more details
 |`CamelAwsS3Headers` |`Map<String,String>` |*Camel 2.15.0*: support to get or set custom objectMetadata headers.
-
-|CamelAwsS3ServerSideEncryption |String |*Camel 2.16:* Sets the server-side encryption algorithm when encrypting
-the object using AWS-managed keys. For example use AES256.
+|`CamelAwsS3ServerSideEncryption` |String |*Camel 2.16:* Sets the server-side encryption algorithm when encrypting the object using AWS-managed keys. For example use AES256.
 |=======================================================================
 
 [[AWS-S3-MessageheaderssetbytheS3producer]]
@@ -155,9 +136,7 @@ Message headers set by the S3 producer
 |=======================================================================
 |Header |Type |Description
 |`CamelAwsS3ETag` |`String` |The ETag value for the newly uploaded object.
-
 |`CamelAwsS3VersionId` |`String` |The *optional* version ID of the newly uploaded object.
-
 |=======================================================================
 
 [[AWS-S3-MessageheaderssetbytheS3consumer]]
@@ -169,45 +148,34 @@ Message headers set by the S3 consumer
 |Header |Type |Description
 
 |`CamelAwsS3Key` |`String` |The key under which this object is stored.
-
 |`CamelAwsS3BucketName` |`String` |The name of the bucket in which this object is contained.
-
 |`CamelAwsS3ETag` |`String` |The hex encoded 128-bit MD5 digest of the associated object according to
 RFC 1864. This data is used as an integrity check to verify that the
 data received by the caller is the same data that was sent by Amazon S3.
-
 |`CamelAwsS3LastModified` |`Date` |The value of the Last-Modified header, indicating the date and time at
 which Amazon S3 last recorded a modification to the associated object.
-
 |`CamelAwsS3VersionId` |`String` |The version ID of the associated Amazon S3 object if available. Version
 IDs are only assigned to objects when an object is uploaded to an Amazon
 S3 bucket that has object versioning enabled.
-
 |`CamelAwsS3ContentType` |`String` |The Content-Type HTTP header, which indicates the type of content stored
 in the associated object. The value of this header is a standard MIME
 type.
-
 |`CamelAwsS3ContentMD5` |`String` |The base64 encoded 128-bit MD5 digest of the associated object (content
 - not including headers) according to RFC 1864. This data is used as a
 message integrity check to verify that the data received by Amazon S3 is
 the same data that the caller sent.
-
 |`CamelAwsS3ContentLength` |`Long` |The Content-Length HTTP header indicating the size of the associated
 object in bytes.
-
 |`CamelAwsS3ContentEncoding` |`String` |The *optional* Content-Encoding HTTP header specifying what content
 encodings have been applied to the object and what decoding mechanisms
 must be applied in order to obtain the media-type referenced by the
 Content-Type field.
-
 |`CamelAwsS3ContentDisposition` |`String` |The *optional* Content-Disposition HTTP header, which specifies
 presentational information such as the recommended filename for the
 object to be saved as.
-
 |`CamelAwsS3ContentControl` |`String` |The *optional* Cache-Control HTTP header which allows the user to
 specify caching behavior along the HTTP request/reply chain.
-
-|CamelAwsS3ServerSideEncryption |String |*Camel 2.16:* The server-side encryption algorithm when encrypting the
+|`CamelAwsS3ServerSideEncryption` |String |*Camel 2.16:* The server-side encryption algorithm when encrypting the
 object using AWS-managed keys.
 |=======================================================================
 


[4/7] camel git commit: Camel experiment with using vertx as eventbus for routing engine.

Posted by da...@apache.org.
Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0479f3b5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0479f3b5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0479f3b5

Branch: refs/heads/eventbus
Commit: 0479f3b5cb987b98c788fb4be376db71f1d7f0b3
Parents: 45050b1
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 8 14:36:08 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 8 14:36:08 2016 +0200

----------------------------------------------------------------------
 .../vertx/eventbus/VertxCamelFilter.java        | 99 ++++++++++++++++++++
 .../vertx/eventbus/VertxFilterProcessor.java    | 66 +++++++++++++
 .../vertx/eventbus/VertxProcessorFactory.java   | 16 ++++
 .../vertx/eventbus/VertxFilterTest.java         | 91 ++++++++++++++++++
 4 files changed, 272 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java
new file mode 100644
index 0000000..938c660
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import java.util.List;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
+
+public class VertxCamelFilter extends ServiceSupport implements Handler<Message<Exchange>> {
+
+    private final CamelContext camelContext;
+    private final Vertx vertx;
+    private final String id;
+    private MessageConsumer<Exchange> consumer;
+    private final DeliveryOptions options;
+
+    public VertxCamelFilter(CamelContext camelContext, Vertx vertx, String id) {
+        this.camelContext = camelContext;
+        this.vertx = vertx;
+        this.id = id;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        consumer = vertx.eventBus().localConsumer(id, this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (consumer != null) {
+            consumer.unregister();
+        }
+    }
+
+    @Override
+    public void handle(Message<Exchange> event) {
+        Exchange exchange = event.body();
+        Predicate predicate = (Predicate) exchange.removeProperty("CamelVertxPredicate");
+        List<Processor> children = (List<Processor>) exchange.removeProperty("CamelVertxChildren");
+
+        boolean matches = false;
+
+        try {
+            matches = matches(exchange, predicate);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        if (matches) {
+            exchange.setProperty("CamelVerxReplyAddress", event.replyAddress());
+            Processor child = children.get(0);
+            try {
+                child.process(exchange);
+            } catch (Exception e) {
+                // ignore
+            }
+        } else {
+            // signal we are done
+            event.reply(exchange, options);
+        }
+    }
+
+    private boolean matches(Exchange exchange, Predicate predicate) {
+        boolean matches = predicate.matches(exchange);
+
+        // set property whether the filter matches or not
+        exchange.setProperty(Exchange.FILTER_MATCHED, matches);
+
+        return matches;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java
new file mode 100644
index 0000000..4b4833d
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import java.util.List;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+public class VertxFilterProcessor implements AsyncProcessor {
+
+    private final Vertx vertx;
+    private final String id;
+    private final Predicate predicate;
+    private final DeliveryOptions options;
+    private final List<Processor> children;
+
+    public VertxFilterProcessor(Vertx vertx, String id, Predicate predicate, List<Processor> children) {
+        this.vertx = vertx;
+        this.id = id;
+        this.predicate = predicate;
+        this.children = children;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        exchange.setProperty("CamelVertxPredicate", predicate);
+        exchange.setProperty("CamelVertxChildren", children);
+        vertx.eventBus().send(VertxCamelFilter.class.getName(), exchange, options, (handler) -> {
+            if (handler.failed()) {
+                Throwable t = handler.cause();
+                exchange.setException(t);
+            }
+            callback.done(false);
+        });
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
index 6131d34..5c762cf 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
@@ -16,9 +16,14 @@
  */
 package org.apache.camel.component.vertx.eventbus;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import io.vertx.core.Vertx;
 import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.model.FilterDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ToDefinition;
 import org.apache.camel.model.TransformDefinition;
@@ -35,6 +40,9 @@ public class VertxProcessorFactory implements ProcessorFactory {
 
     @Override
     public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> def, boolean mandatory) throws Exception {
+        // let the classic camel-core create the child processor, which end up calling the createProcessor below
+        String id = def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+        System.out.println("Child id" + id);
         return null;
     }
 
@@ -48,6 +56,14 @@ public class VertxProcessorFactory implements ProcessorFactory {
         } else if (def instanceof TransformDefinition) {
             Expression expression = ((TransformDefinition) def).getExpression().createExpression(routeContext);
             return new VertxTransformProcessor(vertx, id, expression);
+        } else if (def instanceof FilterDefinition) {
+            Predicate predicate = ((FilterDefinition) def).getExpression().createPredicate(routeContext);
+            List<Processor> children = new ArrayList<>();
+            for (ProcessorDefinition childDef : def.getOutputs()) {
+                Processor child = createProcessor(routeContext, childDef);
+                children.add(child);
+            }
+            return new VertxFilterProcessor(vertx, id, predicate, children);
         }
 
         throw new UnsupportedOperationException("EIP not supported yet");

http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
new file mode 100644
index 0000000..51cd7d4
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vertx.VertxBaseTestSupport;
+import org.junit.Test;
+
+public class VertxFilterTest extends VertxBaseTestSupport {
+
+    private Vertx vertx;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        vertx = Vertx.vertx();
+        vertx.eventBus().registerCodec(new VertxExchangeCodec());
+
+        VertxProcessorFactory pf = new VertxProcessorFactory(vertx);
+        context.setProcessorFactory(pf);
+
+        VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName());
+        context.addService(vcp);
+
+        VertxCamelTransform vct = new VertxCamelTransform(context, vertx, VertxCamelTransform.class.getName());
+        context.addService(vct);
+
+        VertxCamelFilter vcf = new VertxCamelFilter(context, vertx, VertxCamelFilter.class.getName());
+        context.addService(vcf);
+
+        return context;
+    }
+
+    @Test
+    public void testVertxFilter() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+        getMockEndpoint("mock:baz").expectedMessageCount(1);
+        getMockEndpoint("mock:end").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testVertxFilterFalse() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(0);
+        getMockEndpoint("mock:baz").expectedMessageCount(0);
+        getMockEndpoint("mock:end").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:foo")
+                    .filter(body().contains("Hello"))
+                        .to("mock:bar")
+                        .to("mock:baz")
+                    .end()
+                    .to("mock:end");
+            }
+        };
+    }
+}


[6/7] camel git commit: Camel experiment with using vertx as eventbus for routing engine.

Posted by da...@apache.org.
Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9d751caa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9d751caa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9d751caa

Branch: refs/heads/eventbus
Commit: 9d751caa15289992d322bb301469a67a8a2a4655
Parents: 2ad690a
Author: Claus Ibsen <da...@apache.org>
Authored: Sun May 8 16:57:36 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 8 16:57:36 2016 +0200

----------------------------------------------------------------------
 .../camel/component/vertx/eventbus/VertxProcessorFactory.java      | 2 --
 .../org/apache/camel/component/vertx/eventbus/VertxFilterTest.java | 2 ++
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9d751caa/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
index 5c762cf..8d577ca 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
@@ -41,8 +41,6 @@ public class VertxProcessorFactory implements ProcessorFactory {
     @Override
     public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> def, boolean mandatory) throws Exception {
         // let the classic camel-core create the child processor, which end up calling the createProcessor below
-        String id = def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
-        System.out.println("Child id" + id);
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9d751caa/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
index 51cd7d4..d362b36 100644
--- a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java
@@ -21,8 +21,10 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.vertx.VertxBaseTestSupport;
+import org.junit.Ignore;
 import org.junit.Test;
 
+@Ignore("TODO: Does not yet work")
 public class VertxFilterTest extends VertxBaseTestSupport {
 
     private Vertx vertx;


[7/7] camel git commit: Polished

Posted by da...@apache.org.
Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9aafe35f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9aafe35f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9aafe35f

Branch: refs/heads/eventbus
Commit: 9aafe35f3c31494afa61d37a2b4d6522fee30928
Parents: 9d751ca
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 12 10:58:32 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 12 10:58:32 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/component/vertx/eventbus/VertxExchangeCodec.java  | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9aafe35f/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
index ac7a33f..2afda73 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
@@ -25,12 +25,10 @@ public class VertxExchangeCodec implements MessageCodec<Exchange, Exchange> {
     @Override
     public void encodeToWire(Buffer buffer, Exchange exchange) {
         // noop
-        System.out.println("xxx");
     }
 
     @Override
     public Exchange decodeFromWire(int pos, Buffer buffer) {
-        System.out.println("yyy");
         return null;
     }
 


[2/7] camel git commit: Camel experiment with using vertx as eventbus for routing engine.

Posted by da...@apache.org.
Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e3e1803
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e3e1803
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e3e1803

Branch: refs/heads/eventbus
Commit: 3e3e1803316e45035d9a51c101a1eb38d19c8508
Parents: 48219df
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 8 10:20:09 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 8 10:20:09 2016 +0200

----------------------------------------------------------------------
 .../vertx/eventbus/VertxCamelProducer.java      |   6 ++
 .../vertx/eventbus/VertxCamelTransform.java     | 102 +++++++++++++++++++
 .../vertx/eventbus/VertxProcessorFactory.java   |   5 +
 .../vertx/eventbus/VertxSendToProcessor.java    |  22 +++-
 .../vertx/eventbus/VertxTransformProcessor.java |  60 +++++++++++
 .../vertx/eventbus/VertxTransformTest.java      |  71 +++++++++++++
 6 files changed, 262 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
index 923ca38..7b411c9 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.vertx.eventbus;
 
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
 import io.vertx.core.eventbus.Message;
 import io.vertx.core.eventbus.MessageConsumer;
 import org.apache.camel.CamelContext;
@@ -33,12 +34,15 @@ public class VertxCamelProducer extends ServiceSupport implements Handler<Messag
     private final Vertx vertx;
     private final String id;
     private MessageConsumer<Exchange> consumer;
+    private final DeliveryOptions options;
 
     public VertxCamelProducer(CamelContext camelContext, Vertx vertx, String id) {
         this.camelContext = camelContext;
         this.template = camelContext.createProducerTemplate();
         this.vertx = vertx;
         this.id = id;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
     }
 
     @Override
@@ -61,5 +65,7 @@ public class VertxCamelProducer extends ServiceSupport implements Handler<Messag
         String url = (String) exchange.removeProperty("CamelVertxUrl");
         // TODO: execute blocking
         template.send(url, exchange);
+        // signal we are done
+        event.reply(exchange, options);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java
new file mode 100644
index 0000000..923fcaf
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ExchangeHelper;
+
+public class VertxCamelTransform extends ServiceSupport implements Handler<Message<Exchange>> {
+
+    private final CamelContext camelContext;
+    private final Vertx vertx;
+    private final String id;
+    private MessageConsumer<Exchange> consumer;
+    private final DeliveryOptions options;
+
+    public VertxCamelTransform(CamelContext camelContext, Vertx vertx, String id) {
+        this.camelContext = camelContext;
+        this.vertx = vertx;
+        this.id = id;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        consumer = vertx.eventBus().localConsumer(id, this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (consumer != null) {
+            consumer.unregister();
+        }
+    }
+
+    @Override
+    public void handle(Message<Exchange> event) {
+        Exchange exchange = event.body();
+        Expression expression = (Expression) exchange.removeProperty("CamelVertxExpression");
+        // TODO: execute blocking
+        transform(exchange, expression);
+        // signal we are done
+        event.reply(exchange, options);
+    }
+
+    private void transform(Exchange exchange, Expression expression) {
+        Object newBody = expression.evaluate(exchange, Object.class);
+
+        if (exchange.getException() != null) {
+            // the expression threw an exception so we should break-out
+            return;
+        }
+
+        boolean out = exchange.hasOut();
+        org.apache.camel.Message old = out ? exchange.getOut() : exchange.getIn();
+
+        // create a new message container so we do not drag specialized message objects along
+        // but that is only needed if the old message is a specialized message
+        boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class));
+
+        if (copyNeeded) {
+            org.apache.camel.Message msg = new DefaultMessage();
+            msg.copyFrom(old);
+            msg.setBody(newBody);
+
+            // replace message on exchange (must set as OUT)
+            ExchangeHelper.replaceMessage(exchange, msg, true);
+        } else {
+            // no copy needed so set replace value directly
+            old.setBody(newBody);
+
+            // but the message must be on OUT
+            if (!exchange.hasOut()) {
+                exchange.setOut(exchange.getIn());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
index d335725..6131d34 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
@@ -17,9 +17,11 @@
 package org.apache.camel.component.vertx.eventbus;
 
 import io.vertx.core.Vertx;
+import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ToDefinition;
+import org.apache.camel.model.TransformDefinition;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.RouteContext;
 
@@ -43,6 +45,9 @@ public class VertxProcessorFactory implements ProcessorFactory {
         if (def instanceof ToDefinition) {
             String uri = ((ToDefinition) def).getEndpointUri();
             return new VertxSendToProcessor(vertx, id, uri);
+        } else if (def instanceof TransformDefinition) {
+            Expression expression = ((TransformDefinition) def).getExpression().createExpression(routeContext);
+            return new VertxTransformProcessor(vertx, id, expression);
         }
 
         throw new UnsupportedOperationException("EIP not supported yet");

http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
index 139a289..c99dd6a 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
@@ -18,10 +18,12 @@ package org.apache.camel.component.vertx.eventbus;
 
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.DeliveryOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+import org.apache.camel.util.AsyncProcessorHelper;
 
-public class VertxSendToProcessor implements Processor {
+public class VertxSendToProcessor implements AsyncProcessor {
 
     private final Vertx vertx;
     private final String id;
@@ -37,9 +39,21 @@ public class VertxSendToProcessor implements Processor {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         // if OUT then use reply handler to update exchange with result
         exchange.setProperty("CamelVertxUrl", uri);
-        vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options);
+        vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options, (handler) -> {
+            if (handler.failed()) {
+                Throwable t = handler.cause();
+                exchange.setException(t);
+            }
+            callback.done(false);
+        });
+        return false;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java
new file mode 100644
index 0000000..368b12c
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+public class VertxTransformProcessor implements AsyncProcessor {
+
+    private final Vertx vertx;
+    private final String id;
+    private final Expression expression;
+    private final DeliveryOptions options;
+
+    public VertxTransformProcessor(Vertx vertx, String id, Expression expression) {
+        this.vertx = vertx;
+        this.id = id;
+        this.expression = expression;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        exchange.setProperty("CamelVertxExpression", expression);
+        vertx.eventBus().send(VertxCamelTransform.class.getName(), exchange, options, (handler) -> {
+            if (handler.failed()) {
+                Throwable t = handler.cause();
+                exchange.setException(t);
+            }
+            callback.done(false);
+        });
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java
new file mode 100644
index 0000000..c29fd48
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vertx.VertxBaseTestSupport;
+import org.junit.Test;
+
+public class VertxTransformTest extends VertxBaseTestSupport {
+
+    private Vertx vertx;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        vertx = Vertx.vertx();
+        vertx.eventBus().registerCodec(new VertxExchangeCodec());
+
+        VertxProcessorFactory pf = new VertxProcessorFactory(vertx);
+        context.setProcessorFactory(pf);
+
+        VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName());
+        context.addService(vcp);
+
+        VertxCamelTransform vct = new VertxCamelTransform(context, vertx, VertxCamelTransform.class.getName());
+        context.addService(vct);
+
+        return context;
+    }
+
+    @Test
+    public void testVertxTransform() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:foo")
+                    .transform(simple("Hello ${body}"))
+                    .to("mock:bar");
+            }
+        };
+    }
+}