You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/07 17:08:47 UTC
git commit: CAMEL-7176: Added support for sending replies with
camel-vertx, so you can do request/reply over vertx.
Updated Branches:
refs/heads/master 86a762032 -> a779d09c9
CAMEL-7176: Added support for sending replies with camel-vertx, so you can do request/reply over vertx.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a779d09c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a779d09c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a779d09c
Branch: refs/heads/master
Commit: a779d09c9d9e4e630086a892888134607f295f58
Parents: 86a7620
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Feb 7 17:09:22 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Feb 7 17:09:22 2014 +0100
----------------------------------------------------------------------
.../camel/component/vertx/VertxConsumer.java | 14 +++-
.../camel/component/vertx/VertxEndpoint.java | 14 ++++
.../camel/component/vertx/VertxHelper.java | 40 +++++++++++
.../camel/component/vertx/VertxProducer.java | 76 +++++++++++++-------
.../component/vertx/VertxRequestReplyTest.java | 66 +++++++++++++++++
.../component/vertx/VertxRoutePubSubTest.java | 69 ++++++++++++++++++
6 files changed, 253 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index 66f2a95..a805fb6 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.vertx;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
@@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
+import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
+
public class VertxConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(VertxConsumer.class);
private final VertxEndpoint endpoint;
@@ -43,14 +46,21 @@ public class VertxConsumer extends DefaultConsumer {
protected void onEventBusEvent(final Message event) {
LOG.debug("onEvent {}", event);
- final Exchange exchange = endpoint.createExchange();
+ final boolean reply = event.replyAddress() != null;
+ final Exchange exchange = endpoint.createExchange(reply ? ExchangePattern.InOut : ExchangePattern.InOnly);
exchange.getIn().setBody(event.body());
try {
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
- // noop
+ if (reply) {
+ Object body = getVertxBody(exchange);
+ if (body != null) {
+ LOG.debug("Sending reply to: {} with body: {}", event.replyAddress(), body);
+ event.reply(body);
+ }
+ }
}
});
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
index 733d337..4cac188 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
@@ -33,6 +33,8 @@ public class VertxEndpoint extends DefaultEndpoint {
@UriParam
private String address;
+ @UriParam
+ private Boolean pubSub;
public VertxEndpoint(String uri, VertxComponent component, String address) {
super(uri, component);
@@ -70,6 +72,18 @@ public class VertxEndpoint extends DefaultEndpoint {
return address;
}
+ public boolean isPubSub() {
+ return pubSub != null && pubSub;
+ }
+
+ public Boolean getPubSub() {
+ return pubSub;
+ }
+
+ public void setPubSub(Boolean pubSub) {
+ this.pubSub = pubSub;
+ }
+
/**
* Sets the event bus address used to communicate
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
new file mode 100644
index 0000000..661c86d
--- /dev/null
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+
+public final class VertxHelper {
+
+ private VertxHelper() {
+ }
+
+ public static Object getVertxBody(Exchange exchange) {
+ Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ Object body = msg.getBody(JsonObject.class);
+ if (body == null) {
+ body = msg.getBody(JsonArray.class);
+ }
+ if (body == null) {
+ body = msg.getBody(String.class);
+ }
+ return body;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
index b25a448..7748b90 100644
--- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
+++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
@@ -16,17 +16,20 @@
*/
package org.apache.camel.component.vertx;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadRuntimeException;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.MessageHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
-public class VertxProducer extends DefaultProducer {
+import static org.apache.camel.component.vertx.VertxHelper.getVertxBody;
+
+public class VertxProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(VertxProducer.class);
@@ -39,32 +42,57 @@ public class VertxProducer extends DefaultProducer {
return (VertxEndpoint) super.getEndpoint();
}
- public void process(Exchange exchange) throws Exception {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
EventBus eventBus = getEndpoint().getEventBus();
String address = getEndpoint().getAddress();
- Message in = exchange.getIn();
+ boolean reply = ExchangeHelper.isOutCapable(exchange);
+ boolean pubSub = getEndpoint().isPubSub();
- JsonObject jsonObject = in.getBody(JsonObject.class);
- if (jsonObject != null) {
- LOG.debug("Publishing to: {} with JsonObject: {}", address, jsonObject);
- eventBus.publish(address, jsonObject);
- return;
+ Object body = getVertxBody(exchange);
+ if (body != null) {
+ if (reply) {
+ LOG.debug("Sending to: {} with body: {}", address, body);
+ eventBus.send(address, body, new CamelReplyHandler(exchange, callback));
+ return false;
+ } else {
+ if (pubSub) {
+ LOG.debug("Publishing to: {} with body: {}", address, body);
+ eventBus.publish(address, body);
+ } else {
+ LOG.debug("Sending to: {} with body: {}", address, body);
+ eventBus.send(address, body);
+ }
+ callback.done(true);
+ return true;
+ }
}
- JsonArray jsonArray = in.getBody(JsonArray.class);
- if (jsonArray != null) {
- LOG.debug("Publishing to: {} with JsonArray: {}", address, jsonArray);
- eventBus.publish(address, jsonArray);
- return;
+
+ exchange.setException(new InvalidPayloadRuntimeException(exchange, String.class));
+ callback.done(true);
+ return true;
+ }
+
+ private static final class CamelReplyHandler implements Handler<org.vertx.java.core.eventbus.Message> {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ private CamelReplyHandler(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
}
- // and fallback and use string which almost all can be converted
- String text = in.getBody(String.class);
- if (text != null) {
- LOG.debug("Publishing to: {} with String: {}", address, text);
- eventBus.publish(address, new JsonObject(text));
- return;
+ @Override
+ public void handle(org.vertx.java.core.eventbus.Message event) {
+ try {
+ // preserve headers
+ MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
+ exchange.getOut().setBody(event.body());
+ } finally {
+ callback.done(false);
+ }
}
- throw new InvalidPayloadRuntimeException(exchange, String.class);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
new file mode 100644
index 0000000..1000d01
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class VertxRequestReplyTest extends VertxBaseTestSupport {
+
+ protected String startUri = "direct:start";
+ protected String middleUri = "vertx:foo.middle";
+ protected String resultUri = "mock:result";
+
+ protected MockEndpoint resultEndpoint;
+ protected String body1 = "Camel";
+ protected String body2 = "World";
+
+ @Test
+ public void testVertxMessages() throws Exception {
+ resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class);
+ resultEndpoint.expectedBodiesReceivedInAnyOrder("Bye Camel", "Bye World");
+
+ String out = template.requestBody(startUri, body1, String.class);
+ String out2 = template.requestBody(startUri, body2, String.class);
+
+ resultEndpoint.assertIsSatisfied();
+
+ assertEquals("Bye Camel", out);
+ assertEquals("Bye World", out2);
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // camel-vertx cannot be ran with JDK 1.6
+ org.junit.Assume.assumeTrue(!isJava16());
+
+ VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
+ vertx.setPort(getPort());
+
+ from(startUri).to(middleUri).to(resultUri);
+
+ from(middleUri)
+ .transform(simple("Bye ${body}"));
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
new file mode 100644
index 0000000..d185f51
--- /dev/null
+++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class VertxRoutePubSubTest extends VertxBaseTestSupport {
+
+ protected String startUri = "vertx:foo.start?pubSub=true";
+ protected String middleUri = "vertx:foo.middle?pubSub=true";
+ protected String resultUri = "mock:result";
+
+ protected MockEndpoint resultEndpoint;
+ protected String body1 = "{\"id\":1,\"description\":\"Message One\"}";
+ protected String body2 = "{\"id\":2,\"description\":\"Message Two\"}";
+
+ @Test
+ public void testVertxMessages() throws Exception {
+ resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class);
+ resultEndpoint.expectedBodiesReceivedInAnyOrder(body1, body2);
+
+ template.sendBody(startUri, body1);
+ template.sendBody(startUri, body2);
+
+ resultEndpoint.assertIsSatisfied();
+
+ List<Exchange> list = resultEndpoint.getReceivedExchanges();
+ for (Exchange exchange : list) {
+ log.info("Received exchange: " + exchange + " headers: " + exchange.getIn().getHeaders());
+ }
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // camel-vertx cannot be ran with JDK 1.6
+ org.junit.Assume.assumeTrue(!isJava16());
+
+ VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class);
+ vertx.setPort(getPort());
+
+ from(startUri).to(middleUri);
+ from(middleUri).to(resultUri);
+ }
+ };
+ }
+}
\ No newline at end of file