You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/08/02 06:18:25 UTC

[camel] 01/02: Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2329e8ba3f31872a994aba576b00cbe71e7d748e
Author: Nick Houghton <nh...@gmail.com>
AuthorDate: Wed Jul 31 15:20:51 2019 +1000

    Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange
---
 .../camel/component/undertow/UndertowConstants.java   |  2 ++
 .../camel/component/undertow/UndertowConsumer.java    | 19 ++++++++++++++++---
 .../undertow/handlers/CamelWebSocketHandler.java      | 12 ++++++------
 .../undertow/ws/UndertowWsConsumerRouteTest.java      | 17 +++++++++++++++--
 4 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
index 20203cb..c53a039 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
@@ -23,6 +23,8 @@ public final class UndertowConstants {
     public static final String SEND_TO_ALL = "websocket.sendToAll";
     public static final String EVENT_TYPE = "websocket.eventType";
     public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum";
+    public static final String CHANNEL = "websocket.channel";
+    public static final String EXCHANGE = "websocket.exchange";
 
     /**
      * WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route.
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index 72b9e49..02b6f07 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -36,6 +36,8 @@ import io.undertow.util.Methods;
 import io.undertow.util.MimeMappings;
 import io.undertow.util.StatusCodes;
 import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -198,14 +200,18 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
      * {@code connectionKey}.
      *
      * @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received
+     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
      * @param message the message received via the {@link WebSocketChannel}
      */
-    public void sendMessage(final String connectionKey, final Object message) {
+    public void sendMessage(final String connectionKey, WebSocketChannel channel, final Object message) {
 
         final Exchange exchange = getEndpoint().createExchange();
 
         // set header and body
         exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
+        if(channel != null) {
+            exchange.getIn().setHeader(UndertowConstants.CHANNEL, channel);
+        }
         exchange.getIn().setBody(message);
 
         // send exchange using the async routing engine
@@ -223,16 +229,23 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
      * Send a notification related a WebSocket peer.
      *
      * @param connectionKey of WebSocket peer
+     * @param transportExchange the exchange for the websocket transport, only available for ON_OPEN events
+     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
      * @param eventType the type of the event
      */
-    public void sendEventNotification(String connectionKey, EventType eventType) {
+    public void sendEventNotification(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
         final Exchange exchange = getEndpoint().createExchange();
 
         final Message in = exchange.getIn();
         in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
         in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode());
         in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType);
-
+        if(channel != null){
+            in.setHeader(UndertowConstants.CHANNEL, channel);
+        }
+        if(transportExchange != null){
+            in.setHeader(UndertowConstants.EXCHANGE, transportExchange);
+        }
         // send exchange using the async routing engine
         getAsyncProcessor().process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
index f6065d6..5e67b98 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
@@ -85,7 +85,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             @Override
             public void handleEvent(WebSocketChannel channel) {
                 sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY),
-                        EventType.ONCLOSE);
+                        null, channel, EventType.ONCLOSE);
             }
         };
         this.delegate = Handlers.websocket(callback);
@@ -181,12 +181,12 @@ public class CamelWebSocketHandler implements HttpHandler {
         }
     }
 
-    void sendEventNotificationIfNeeded(String connectionKey, EventType eventType) {
+    void sendEventNotificationIfNeeded(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
         synchronized (consumerLock) {
             synchronized (consumerLock) {
                 if (consumer != null) {
                     if (consumer.getEndpoint().isFireWebSocketChannelEvents()) {
-                        consumer.sendEventNotification(connectionKey, eventType);
+                        consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType);
                     }
                 } else {
                     LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType);
@@ -315,7 +315,7 @@ public class CamelWebSocketHandler implements HttpHandler {
                 synchronized (consumerLock) {
                     if (consumer != null) {
                         final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes;
-                        consumer.sendMessage(connectionKey, outMsg);
+                        consumer.sendMessage(connectionKey, channel, outMsg);
                     } else {
                         LOG.debug("No consumer to handle message received: {}", message);
                     }
@@ -337,7 +337,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             synchronized (consumerLock) {
                 if (consumer != null) {
                     final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text;
-                    consumer.sendMessage(connectionKey, outMsg);
+                    consumer.sendMessage(connectionKey, channel, outMsg);
                 } else {
                     LOG.debug("No consumer to handle message received: {}", message);
                 }
@@ -362,7 +362,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             channel.setAttribute(UndertowConstants.CONNECTION_KEY, connectionKey);
             channel.getReceiveSetter().set(receiveListener);
             channel.addCloseTask(closeListener);
-            sendEventNotificationIfNeeded(connectionKey, EventType.ONOPEN);
+            sendEventNotificationIfNeeded(connectionKey, exchange, channel, EventType.ONOPEN);
             channel.resumeReceives();
         }
 
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
index 11b7654..573ebd1 100644
--- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.undertow.ws;
 
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -125,7 +128,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
         result.await(60, TimeUnit.SECONDS);
         List<Exchange> exchanges = result.getReceivedExchanges();
         Assert.assertEquals(1, exchanges.size());
-        Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+        Exchange exchange = result.getReceivedExchanges().get(0);
+        assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+        Object body = exchange.getIn().getBody();
         Assert.assertTrue("body is " + body.getClass().getName(), body instanceof Reader);
         Reader r = (Reader) body;
         Assert.assertEquals("Test", IOConverter.toString(r));
@@ -208,7 +213,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
         result.await(60, TimeUnit.SECONDS);
         List<Exchange> exchanges = result.getReceivedExchanges();
         Assert.assertEquals(1, exchanges.size());
-        Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+        Exchange exchange = result.getReceivedExchanges().get(0);
+        assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+        Object body = exchange.getIn().getBody();
         Assert.assertTrue("body is " + body.getClass().getName(), body instanceof InputStream);
         InputStream in = (InputStream) body;
         Assert.assertArrayEquals(testmessage, IOConverter.toBytes(in));
@@ -376,6 +383,12 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
             final Message in = exchange.getIn();
             final String key = (String) in.getHeader(UndertowConstants.CONNECTION_KEY);
             Assert.assertNotNull(key);
+            final WebSocketChannel channel = in.getHeader(UndertowConstants.CHANNEL, WebSocketChannel.class);
+            Assert.assertNotNull(channel);
+            if(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN){
+                final WebSocketHttpExchange transportExchange = in.getHeader(UndertowConstants.EXCHANGE, WebSocketHttpExchange.class);
+                Assert.assertNotNull(transportExchange);
+            }
             List<String> messages = connections.get(key);
             if (messages == null) {
                 messages = new ArrayList<>();