You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/08/02 06:18:24 UTC

[camel] branch master updated (0b9ad3b -> 3be3f6f)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 0b9ad3b  CAMEL-13811: Remove camel-boon
     new 2329e8b  Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange
     new 3be3f6f  Fix checkstyle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/undertow/UndertowConstants.java      |  2 +
 .../camel/component/undertow/UndertowConsumer.java | 33 +++++++++++-----
 .../undertow/handlers/CamelWebSocketHandler.java   | 44 ++++++++++------------
 .../undertow/ws/UndertowWsConsumerRouteTest.java   | 17 ++++++++-
 4 files changed, 60 insertions(+), 36 deletions(-)


[camel] 01/02: Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2329e8ba3f31872a994aba576b00cbe71e7d748e
Author: Nick Houghton <nh...@gmail.com>
AuthorDate: Wed Jul 31 15:20:51 2019 +1000

    Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange
---
 .../camel/component/undertow/UndertowConstants.java   |  2 ++
 .../camel/component/undertow/UndertowConsumer.java    | 19 ++++++++++++++++---
 .../undertow/handlers/CamelWebSocketHandler.java      | 12 ++++++------
 .../undertow/ws/UndertowWsConsumerRouteTest.java      | 17 +++++++++++++++--
 4 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
index 20203cb..c53a039 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java
@@ -23,6 +23,8 @@ public final class UndertowConstants {
     public static final String SEND_TO_ALL = "websocket.sendToAll";
     public static final String EVENT_TYPE = "websocket.eventType";
     public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum";
+    public static final String CHANNEL = "websocket.channel";
+    public static final String EXCHANGE = "websocket.exchange";
 
     /**
      * WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route.
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index 72b9e49..02b6f07 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -36,6 +36,8 @@ import io.undertow.util.Methods;
 import io.undertow.util.MimeMappings;
 import io.undertow.util.StatusCodes;
 import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -198,14 +200,18 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
      * {@code connectionKey}.
      *
      * @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received
+     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
      * @param message the message received via the {@link WebSocketChannel}
      */
-    public void sendMessage(final String connectionKey, final Object message) {
+    public void sendMessage(final String connectionKey, WebSocketChannel channel, final Object message) {
 
         final Exchange exchange = getEndpoint().createExchange();
 
         // set header and body
         exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
+        if(channel != null) {
+            exchange.getIn().setHeader(UndertowConstants.CHANNEL, channel);
+        }
         exchange.getIn().setBody(message);
 
         // send exchange using the async routing engine
@@ -223,16 +229,23 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
      * Send a notification related a WebSocket peer.
      *
      * @param connectionKey of WebSocket peer
+     * @param transportExchange the exchange for the websocket transport, only available for ON_OPEN events
+     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
      * @param eventType the type of the event
      */
-    public void sendEventNotification(String connectionKey, EventType eventType) {
+    public void sendEventNotification(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
         final Exchange exchange = getEndpoint().createExchange();
 
         final Message in = exchange.getIn();
         in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
         in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode());
         in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType);
-
+        if(channel != null){
+            in.setHeader(UndertowConstants.CHANNEL, channel);
+        }
+        if(transportExchange != null){
+            in.setHeader(UndertowConstants.EXCHANGE, transportExchange);
+        }
         // send exchange using the async routing engine
         getAsyncProcessor().process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
index f6065d6..5e67b98 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
@@ -85,7 +85,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             @Override
             public void handleEvent(WebSocketChannel channel) {
                 sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY),
-                        EventType.ONCLOSE);
+                        null, channel, EventType.ONCLOSE);
             }
         };
         this.delegate = Handlers.websocket(callback);
@@ -181,12 +181,12 @@ public class CamelWebSocketHandler implements HttpHandler {
         }
     }
 
-    void sendEventNotificationIfNeeded(String connectionKey, EventType eventType) {
+    void sendEventNotificationIfNeeded(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
         synchronized (consumerLock) {
             synchronized (consumerLock) {
                 if (consumer != null) {
                     if (consumer.getEndpoint().isFireWebSocketChannelEvents()) {
-                        consumer.sendEventNotification(connectionKey, eventType);
+                        consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType);
                     }
                 } else {
                     LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType);
@@ -315,7 +315,7 @@ public class CamelWebSocketHandler implements HttpHandler {
                 synchronized (consumerLock) {
                     if (consumer != null) {
                         final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes;
-                        consumer.sendMessage(connectionKey, outMsg);
+                        consumer.sendMessage(connectionKey, channel, outMsg);
                     } else {
                         LOG.debug("No consumer to handle message received: {}", message);
                     }
@@ -337,7 +337,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             synchronized (consumerLock) {
                 if (consumer != null) {
                     final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text;
-                    consumer.sendMessage(connectionKey, outMsg);
+                    consumer.sendMessage(connectionKey, channel, outMsg);
                 } else {
                     LOG.debug("No consumer to handle message received: {}", message);
                 }
@@ -362,7 +362,7 @@ public class CamelWebSocketHandler implements HttpHandler {
             channel.setAttribute(UndertowConstants.CONNECTION_KEY, connectionKey);
             channel.getReceiveSetter().set(receiveListener);
             channel.addCloseTask(closeListener);
-            sendEventNotificationIfNeeded(connectionKey, EventType.ONOPEN);
+            sendEventNotificationIfNeeded(connectionKey, exchange, channel, EventType.ONOPEN);
             channel.resumeReceives();
         }
 
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
index 11b7654..573ebd1 100644
--- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.undertow.ws;
 
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -125,7 +128,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
         result.await(60, TimeUnit.SECONDS);
         List<Exchange> exchanges = result.getReceivedExchanges();
         Assert.assertEquals(1, exchanges.size());
-        Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+        Exchange exchange = result.getReceivedExchanges().get(0);
+        assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+        Object body = exchange.getIn().getBody();
         Assert.assertTrue("body is " + body.getClass().getName(), body instanceof Reader);
         Reader r = (Reader) body;
         Assert.assertEquals("Test", IOConverter.toString(r));
@@ -208,7 +213,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
         result.await(60, TimeUnit.SECONDS);
         List<Exchange> exchanges = result.getReceivedExchanges();
         Assert.assertEquals(1, exchanges.size());
-        Object body = result.getReceivedExchanges().get(0).getIn().getBody();
+        Exchange exchange = result.getReceivedExchanges().get(0);
+        assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL));
+        Object body = exchange.getIn().getBody();
         Assert.assertTrue("body is " + body.getClass().getName(), body instanceof InputStream);
         InputStream in = (InputStream) body;
         Assert.assertArrayEquals(testmessage, IOConverter.toBytes(in));
@@ -376,6 +383,12 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
             final Message in = exchange.getIn();
             final String key = (String) in.getHeader(UndertowConstants.CONNECTION_KEY);
             Assert.assertNotNull(key);
+            final WebSocketChannel channel = in.getHeader(UndertowConstants.CHANNEL, WebSocketChannel.class);
+            Assert.assertNotNull(channel);
+            if(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN){
+                final WebSocketHttpExchange transportExchange = in.getHeader(UndertowConstants.EXCHANGE, WebSocketHttpExchange.class);
+                Assert.assertNotNull(transportExchange);
+            }
             List<String> messages = connections.get(key);
             if (messages == null) {
                 messages = new ArrayList<>();


[camel] 02/02: Fix checkstyle

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3be3f6f68af0ce9edf9e740896144ba946b81391
Author: Nick Houghton <nh...@gmail.com>
AuthorDate: Thu Aug 1 08:49:38 2019 +1000

    Fix checkstyle
---
 .../camel/component/undertow/UndertowConsumer.java | 24 ++++++++--------
 .../undertow/handlers/CamelWebSocketHandler.java   | 32 ++++++++++------------
 .../undertow/ws/UndertowWsConsumerRouteTest.java   |  8 +++---
 3 files changed, 30 insertions(+), 34 deletions(-)

diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index 02b6f07..a3e7675 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -89,9 +89,9 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
                     accessLogReciever = new JBossLoggingAccessLogReceiver();
                 }
                 httpHandler = new AccessLogHandler(httpHandler,
-                                                   accessLogReciever,
-                                                   "common",
-                                                   AccessLogHandler.class.getClassLoader());
+                        accessLogReciever,
+                        "common",
+                        AccessLogHandler.class.getClassLoader());
             }
             endpoint.getComponent().registerEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext(), Handlers.httpContinueRead(
                     // wrap with EagerFormParsingHandler to enable undertow form parsers
@@ -106,7 +106,7 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
             this.webSocketHandler.setConsumer(null);
         }
         UndertowEndpoint endpoint = getEndpoint();
-        endpoint .getComponent().unregisterEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext());
+        endpoint.getComponent().unregisterEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext());
     }
 
     @Override
@@ -200,8 +200,8 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
      * {@code connectionKey}.
      *
      * @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received
-     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
-     * @param message the message received via the {@link WebSocketChannel}
+     * @param channel       the {@link WebSocketChannel} through which the {@code message} was received
+     * @param message       the message received via the {@link WebSocketChannel}
      */
     public void sendMessage(final String connectionKey, WebSocketChannel channel, final Object message) {
 
@@ -209,7 +209,7 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
 
         // set header and body
         exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
-        if(channel != null) {
+        if (channel != null) {
             exchange.getIn().setHeader(UndertowConstants.CHANNEL, channel);
         }
         exchange.getIn().setBody(message);
@@ -228,10 +228,10 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
     /**
      * Send a notification related a WebSocket peer.
      *
-     * @param connectionKey of WebSocket peer
+     * @param connectionKey     of WebSocket peer
      * @param transportExchange the exchange for the websocket transport, only available for ON_OPEN events
-     * @param channel the {@link WebSocketChannel} through which the {@code message} was received
-     * @param eventType the type of the event
+     * @param channel           the {@link WebSocketChannel} through which the {@code message} was received
+     * @param eventType         the type of the event
      */
     public void sendEventNotification(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) {
         final Exchange exchange = getEndpoint().createExchange();
@@ -240,10 +240,10 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler {
         in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey);
         in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode());
         in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType);
-        if(channel != null){
+        if (channel != null) {
             in.setHeader(UndertowConstants.CHANNEL, channel);
         }
-        if(transportExchange != null){
+        if (transportExchange != null) {
             in.setHeader(UndertowConstants.EXCHANGE, transportExchange);
         }
         // send exchange using the async routing engine
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
index 5e67b98..744767f 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
@@ -44,6 +44,7 @@ import io.undertow.websockets.core.BufferedTextMessage;
 import io.undertow.websockets.core.WebSocketChannel;
 import io.undertow.websockets.core.WebSockets;
 import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
@@ -95,18 +96,14 @@ public class CamelWebSocketHandler implements HttpHandler {
      * Send the given {@code message} to the given {@code channel} and report the outcome to the given {@code callback}
      * within the given {@code timeoutMillis}.
      *
-     * @param channel
-     *            the channel to sent the {@code message} to
-     * @param message
-     *            the message to send
-     * @param callback
-     *            where to report the outcome
-     * @param timeoutMillis
-     *            the timeout in milliseconds
+     * @param channel       the channel to sent the {@code message} to
+     * @param message       the message to send
+     * @param callback      where to report the outcome
+     * @param timeoutMillis the timeout in milliseconds
      * @throws IOException
      */
     private static void send(WebSocketChannel channel, Object message, ExtendedWebSocketCallback callback,
-            long timeoutMillis) throws IOException {
+                             long timeoutMillis) throws IOException {
         if (channel.isOpen()) {
             if (message instanceof String) {
                 WebSockets.sendText((String) message, channel, callback);
@@ -130,7 +127,9 @@ public class CamelWebSocketHandler implements HttpHandler {
         }
     }
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public void handleRequest(HttpServerExchange exchange) throws Exception {
         this.delegate.handleRequest(exchange);
@@ -140,20 +139,17 @@ public class CamelWebSocketHandler implements HttpHandler {
      * Send the given {@code message} to one or more channels selected using the given {@code peerFilter} within the
      * given {@code timeout} and report the outcome to the given {@code camelExchange} and {@code camelCallback}.
      *
-     * @param peerFilter
-     *            a {@link Predicate} to apply to the set of peers obtained via {@link #delegate}'s
-     *            {@link WebSocketProtocolHandshakeHandler#getPeerConnections()}
-     * @param message
-     *            the message to send
+     * @param peerFilter    a {@link Predicate} to apply to the set of peers obtained via {@link #delegate}'s
+     *                      {@link WebSocketProtocolHandshakeHandler#getPeerConnections()}
+     * @param message       the message to send
      * @param camelExchange to notify about the outcome
      * @param camelCallback to notify about the outcome
-     * @param timeout
-     *            in milliseconds
+     * @param timeout       in milliseconds
      * @return {@code true} if the execution finished synchronously or {@code false} otherwise
      * @throws IOException
      */
     public boolean send(Predicate<WebSocketChannel> peerFilter, Object message, final int timeout,
-            final Exchange camelExchange, final AsyncCallback camelCallback) throws IOException {
+                        final Exchange camelExchange, final AsyncCallback camelCallback) throws IOException {
         List<WebSocketChannel> targetPeers = delegate.getPeerConnections().stream().filter(peerFilter).collect(Collectors.toList());
         if (targetPeers.isEmpty()) {
             camelCallback.done(true);
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
index 573ebd1..e81ce77 100644
--- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.component.undertow.ws;
 
-import io.undertow.websockets.core.WebSocketChannel;
-import io.undertow.websockets.spi.WebSocketHttpExchange;
-
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -32,6 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.spi.WebSocketHttpExchange;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -385,7 +385,7 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest {
             Assert.assertNotNull(key);
             final WebSocketChannel channel = in.getHeader(UndertowConstants.CHANNEL, WebSocketChannel.class);
             Assert.assertNotNull(channel);
-            if(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN){
+            if (in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN) {
                 final WebSocketHttpExchange transportExchange = in.getHeader(UndertowConstants.EXCHANGE, WebSocketHttpExchange.class);
                 Assert.assertNotNull(transportExchange);
             }