You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/07 17:08:47 UTC

git commit: CAMEL-7176: Added support for sending replies with camel-vertx, so you can do request/reply over vertx.

Updated Branches:
  refs/heads/master 86a762032 -> a779d09c9


CAMEL-7176: Added support for sending replies with camel-vertx, so you can do request/reply over vertx.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a779d09c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a779d09c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a779d09c

Branch: refs/heads/master
Commit: a779d09c9d9e4e630086a892888134607f295f58
Parents: 86a7620
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 7 17:09:22 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 7 17:09:22 2014 +0100

----------------------------------------------------------------------
 .../camel/component/vertx/VertxConsumer.java    | 14 +++-
 .../camel/component/vertx/VertxEndpoint.java    | 14 ++++
 .../camel/component/vertx/VertxHelper.java      | 40 +++++++++++
 .../camel/component/vertx/VertxProducer.java    | 76 +++++++++++++-------
 .../component/vertx/VertxRequestReplyTest.java  | 66 +++++++++++++++++
 .../component/vertx/VertxRoutePubSubTest.java   | 69 ++++++++++++++++++
 6 files changed, 253 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index 66f2a95..a805fb6 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.vertx;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
@@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory;
 import org.vertx.java.core.Handler;
 import org.vertx.java.core.eventbus.Message;
 
+import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
+
 public class VertxConsumer extends DefaultConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(VertxConsumer.class);
     private final VertxEndpoint endpoint;
@@ -43,14 +46,21 @@ public class VertxConsumer extends DefaultConsumer {
     protected void onEventBusEvent(final Message event) {
         LOG.debug("onEvent {}", event);
 
-        final Exchange exchange = endpoint.createExchange();
+        final boolean reply = event.replyAddress() != null;
+        final Exchange exchange = endpoint.createExchange(reply ? ExchangePattern.InOut : ExchangePattern.InOnly);
         exchange.getIn().setBody(event.body());
 
         try {
             getAsyncProcessor().process(exchange, new AsyncCallback() {
                 @Override
                 public void done(boolean doneSync) {
-                    // noop
+                    if (reply) {
+                        Object body = getVertxBody(exchange);
+                        if (body != null) {
+                            LOG.debug("Sending reply to: {} with body: {}", event.replyAddress(), body);
+                            event.reply(body);
+                        }
+                    }
                 }
             });
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
index 733d337..4cac188 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
@@ -33,6 +33,8 @@ public class VertxEndpoint extends DefaultEndpoint {
 
     @UriParam
     private String address;
+    @UriParam
+    private Boolean pubSub;
 
     public VertxEndpoint(String uri, VertxComponent component, String address) {
         super(uri, component);
@@ -70,6 +72,18 @@ public class VertxEndpoint extends DefaultEndpoint {
         return address;
     }
 
+    public boolean isPubSub() {
+        return pubSub != null && pubSub;
+    }
+
+    public Boolean getPubSub() {
+        return pubSub;
+    }
+
+    public void setPubSub(Boolean pubSub) {
+        this.pubSub = pubSub;
+    }
+
     /**
      * Sets the event bus address used to communicate
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
new file mode 100644
index 0000000..661c86d
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+
+public final class VertxHelper {
+
+    private VertxHelper() {
+    }
+
+    public static Object getVertxBody(Exchange exchange) {
+        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        Object body = msg.getBody(JsonObject.class);
+        if (body == null) {
+            body = msg.getBody(JsonArray.class);
+        }
+        if (body == null) {
+            body = msg.getBody(String.class);
+        }
+        return body;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
index b25a448..7748b90 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
@@ -16,17 +16,20 @@
  */
 package org.apache.camel.component.vertx;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadRuntimeException;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.MessageHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.vertx.java.core.Handler;
 import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
 
-public class VertxProducer extends DefaultProducer {
+import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
+
+public class VertxProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(VertxProducer.class);
 
@@ -39,32 +42,57 @@ public class VertxProducer extends DefaultProducer {
         return (VertxEndpoint) super.getEndpoint();
     }
 
-    public void process(Exchange exchange) throws Exception {
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         EventBus eventBus = getEndpoint().getEventBus();
         String address = getEndpoint().getAddress();
 
-        Message in = exchange.getIn();
+        boolean reply = ExchangeHelper.isOutCapable(exchange);
+        boolean pubSub = getEndpoint().isPubSub();
 
-        JsonObject jsonObject = in.getBody(JsonObject.class);
-        if (jsonObject != null) {
-            LOG.debug("Publishing to: {} with JsonObject: {}", address, jsonObject);
-            eventBus.publish(address, jsonObject);
-            return;
+        Object body = getVertxBody(exchange);
+        if (body != null) {
+            if (reply) {
+                LOG.debug("Sending to: {} with body: {}", address, body);
+                eventBus.send(address, body, new CamelReplyHandler(exchange, callback));
+                return false;
+            } else {
+                if (pubSub) {
+                    LOG.debug("Publishing to: {} with body: {}", address, body);
+                    eventBus.publish(address, body);
+                } else {
+                    LOG.debug("Sending to: {} with body: {}", address, body);
+                    eventBus.send(address, body);
+                }
+                callback.done(true);
+                return true;
+            }
         }
-        JsonArray jsonArray = in.getBody(JsonArray.class);
-        if (jsonArray != null) {
-            LOG.debug("Publishing to: {} with JsonArray: {}", address, jsonArray);
-            eventBus.publish(address, jsonArray);
-            return;
+
+        exchange.setException(new InvalidPayloadRuntimeException(exchange, String.class));
+        callback.done(true);
+        return true;
+    }
+
+    private static final class CamelReplyHandler implements Handler<org.vertx.java.core.eventbus.Message> {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        private CamelReplyHandler(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
         }
 
-        // and fallback and use string which almost all can be converted
-        String text = in.getBody(String.class);
-        if (text != null) {
-            LOG.debug("Publishing to: {} with String: {}", address, text);
-            eventBus.publish(address, new JsonObject(text));
-            return;
+        @Override
+        public void handle(org.vertx.java.core.eventbus.Message event) {
+            try {
+                // preserve headers
+                MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
+                exchange.getOut().setBody(event.body());
+            } finally {
+                callback.done(false);
+            }
         }
-        throw new InvalidPayloadRuntimeException(exchange, String.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
new file mode 100644
index 0000000..1000d01
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class VertxRequestReplyTest extends VertxBaseTestSupport {
+
+    protected String startUri = "direct:start";
+    protected String middleUri = "vertx:foo.middle";
+    protected String resultUri = "mock:result";
+
+    protected MockEndpoint resultEndpoint;
+    protected String body1 = "Camel";
+    protected String body2 = "World";
+
+    @Test
+    public void testVertxMessages() throws Exception {
+        resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("Bye Camel", "Bye World");
+
+        String out = template.requestBody(startUri, body1, String.class);
+        String out2 = template.requestBody(startUri, body2, String.class);
+
+        resultEndpoint.assertIsSatisfied();
+
+        assertEquals("Bye Camel", out);
+        assertEquals("Bye World", out2);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // camel-vertx cannot be ran with JDK 1.6
+                org.junit.Assume.assumeTrue(!isJava16());
+
+                VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
+                vertx.setPort(getPort());
+
+                from(startUri).to(middleUri).to(resultUri);
+
+                from(middleUri)
+                    .transform(simple("Bye ${body}"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
new file mode 100644
index 0000000..d185f51
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class VertxRoutePubSubTest extends VertxBaseTestSupport {
+
+    protected String startUri = "vertx:foo.start?pubSub=true";
+    protected String middleUri = "vertx:foo.middle?pubSub=true";
+    protected String resultUri = "mock:result";
+
+    protected MockEndpoint resultEndpoint;
+    protected String body1 = "{\"id\":1,\"description\":\"Message One\"}";
+    protected String body2 = "{\"id\":2,\"description\":\"Message Two\"}";
+
+    @Test
+    public void testVertxMessages() throws Exception {
+        resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder(body1, body2);
+
+        template.sendBody(startUri, body1);
+        template.sendBody(startUri, body2);
+
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        for (Exchange exchange : list) {
+            log.info("Received exchange: " + exchange + " headers: " + exchange.getIn().getHeaders());
+        }
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // camel-vertx cannot be ran with JDK 1.6
+                org.junit.Assume.assumeTrue(!isJava16());
+
+                VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
+                vertx.setPort(getPort());
+
+                from(startUri).to(middleUri);
+                from(middleUri).to(resultUri);
+            }
+        };
+    }
+}
\ No newline at end of file