You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/08/02 06:18:25 UTC
[camel] 01/02: Expose WebSocketChannel and WebSocketHttpExchange to
the Camel Exchange
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2329e8ba3f31872a994aba576b00cbe71e7d748e
Author: Nick Houghton <nh...@gmail.com>
AuthorDate: Wed Jul 31 15:20:51 2019 +1000
Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange
---
.../camel/component/undertow/UndertowConstants.java | 2 ++
.../camel/component/undertow/UndertowConsumer.java | 19 ++++++++++++++++---
.../undertow/handlers/CamelWebSocketHandler.java | 12 ++++++------
.../undertow/ws/UndertowWsConsumerRouteTest.java | 17 +++++++++++++++--
4 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
index 20203cb..c53a039 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
@@ -23,6 +23,8 @@ public final class UndertowConstants {
public static final String SEND_TO_ALL = "websocket.sendToAll";
public static final String EVENT_TYPE = "websocket.eventType";
public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum";
+ public static final String CHANNEL = "websocket.channel";
+ public static final String EXCHANGE = "websocket.exchange";
/**
* WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route.
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index 72b9e49..02b6f07 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -36,6 +36,8 @@ import io.undertow.util.Methods;
import io.undertow.util.MimeMappings;
import io.undertow.util.StatusCodes;
import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -198,14 +200,18 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
* {@code connectionKey}.
*
* @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received
+ * @param channel the {@link WebSocketChannel} through which the {@code message} was received
* @param message the message received via the {@link WebSocketChannel}
*/
- public void sendMessage(final String connectionKey, final Object message) {
+ public void sendMessage(final String connectionKey, WebSocketChannel channel, final Object message) {
final Exchange exchange = getEndpoint().createExchange();
// set header and body
exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
+ if(channel != null) {
+ exchange.getIn().setHeader(UndertowConstants.CHANNEL, channel);
+ }
exchange.getIn().setBody(message);
// send exchange using the async routing engine
@@ -223,16 +229,23 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
* Send a notification related a WebSocket peer.
*
* @param connectionKey of WebSocket peer
+ * @param transportExchange the exchange for the websocket transport, only available for ON_OPEN events
+ * @param channel the {@link WebSocketChannel} through which the {@code message} was received
* @param eventType the type of the event
*/
- public void sendEventNotification(String connectionKey, EventType eventType) {
+ public void sendEventNotification(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
final Exchange exchange = getEndpoint().createExchange();
final Message in = exchange.getIn();
in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode());
in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType);
-
+ if(channel != null){
+ in.setHeader(UndertowConstants.CHANNEL, channel);
+ }
+ if(transportExchange != null){
+ in.setHeader(UndertowConstants.EXCHANGE, transportExchange);
+ }
// send exchange using the async routing engine
getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
index f6065d6..5e67b98 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
@@ -85,7 +85,7 @@ public class CamelWebSocketHandler implements HttpHandler {
@Override
public void handleEvent(WebSocketChannel channel) {
sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY),
- EventType.ONCLOSE);
+ null, channel, EventType.ONCLOSE);
}
};
this.delegate = Handlers.websocket(callback);
@@ -181,12 +181,12 @@ public class CamelWebSocketHandler implements HttpHandler {
}
}
- void sendEventNotificationIfNeeded(String connectionKey, EventType eventType) {
+ void sendEventNotificationIfNeeded(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
synchronized (consumerLock) {
synchronized (consumerLock) {
if (consumer != null) {
if (consumer.getEndpoint().isFireWebSocketChannelEvents()) {
- consumer.sendEventNotification(connectionKey, eventType);
+ consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType);
}
} else {
LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType);
@@ -315,7 +315,7 @@ public class CamelWebSocketHandler implements HttpHandler {
synchronized (consumerLock) {
if (consumer != null) {
final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes;
- consumer.sendMessage(connectionKey, outMsg);
+ consumer.sendMessage(connectionKey, channel, outMsg);
} else {
LOG.debug("No consumer to handle message received: {}", message);
}
@@ -337,7 +337,7 @@ public class CamelWebSocketHandler implements HttpHandler {
synchronized (consumerLock) {
if (consumer != null) {
final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text;
- consumer.sendMessage(connectionKey, outMsg);
+ consumer.sendMessage(connectionKey, channel, outMsg);
} else {
LOG.debug("No consumer to handle message received: {}", message);
}
@@ -362,7 +362,7 @@ public class CamelWebSocketHandler implements HttpHandler {
channel.setAttribute(UndertowConstants.CONNECTION_KEY, connectionKey);
channel.getReceiveSetter().set(receiveListener);
channel.addCloseTask(closeListener);
- sendEventNotificationIfNeeded(connectionKey, EventType.ONOPEN);
+ sendEventNotificationIfNeeded(connectionKey, exchange, channel, EventType.ONOPEN);
channel.resumeReceives();
}
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
index 11b7654..573ebd1 100644
--- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.undertow.ws;
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
@@ -125,7 +128,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
result.await(60, TimeUnit.SECONDS);
List<Exchange> exchanges = result.getReceivedExchanges();
Assert.assertEquals(1, exchanges.size());
- Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+ Exchange exchange = result.getReceivedExchanges().get(0);
+ assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+ Object body = exchange.getIn().getBody();
Assert.assertTrue("body is " + body.getClass().getName(), body instanceof Reader);
Reader r = (Reader) body;
Assert.assertEquals("Test", IOConverter.toString(r));
@@ -208,7 +213,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
result.await(60, TimeUnit.SECONDS);
List<Exchange> exchanges = result.getReceivedExchanges();
Assert.assertEquals(1, exchanges.size());
- Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+ Exchange exchange = result.getReceivedExchanges().get(0);
+ assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+ Object body = exchange.getIn().getBody();
Assert.assertTrue("body is " + body.getClass().getName(), body instanceof InputStream);
InputStream in = (InputStream) body;
Assert.assertArrayEquals(testmessage, IOConverter.toBytes(in));
@@ -376,6 +383,12 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
final Message in = exchange.getIn();
final String key = (String) in.getHeader(UndertowConstants.CONNECTION_KEY);
Assert.assertNotNull(key);
+ final WebSocketChannel channel = in.getHeader(UndertowConstants.CHANNEL, WebSocketChannel.class);
+ Assert.assertNotNull(channel);
+ if(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN){
+ final WebSocketHttpExchange transportExchange = in.getHeader(UndertowConstants.EXCHANGE, WebSocketHttpExchange.class);
+ Assert.assertNotNull(transportExchange);
+ }
List<String> messages = connections.get(key);
if (messages == null) {
messages = new ArrayList<>();