You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/26 11:30:29 UTC

camel git commit: [CAMEL-9364] Add ability to receive onOpen/onClose/onError websocket events through camel rout.

Repository: camel
Updated Branches:
  refs/heads/master e5c130b3e -> 36297987b


[CAMEL-9364] Add ability to receive onOpen/onClose/onError websocket events through camel rout.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36297987
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36297987
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36297987

Branch: refs/heads/master
Commit: 36297987bb152beacffcfc5e9ae1ee4a050752a3
Parents: e5c130b
Author: Pavlo Kletsko <pk...@gmail.com>
Authored: Thu Nov 26 10:30:04 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Nov 26 11:29:16 2015 +0100

----------------------------------------------------------------------
 .../websocket/CamelWebSocketServlet.java        | 16 +++++-
 .../websocket/WebsocketConstants.java           |  7 ++-
 .../atmosphere/websocket/WebsocketConsumer.java | 37 +++++++++----
 .../atmosphere/websocket/WebsocketHandler.java  | 11 +++-
 .../WebsocketCamelRouterTestSupport.java        |  4 +-
 .../websocket/WebsocketRouteTest.java           | 57 +++++++++++++++++++-
 6 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
index 07c9779..6c9be21 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.atmosphere.websocket;
 
 import java.io.IOException;
+import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -34,6 +35,19 @@ import org.apache.camel.http.common.HttpConsumer;
  */
 public class CamelWebSocketServlet extends CamelHttpTransportServlet {
     private static final long serialVersionUID = 1764707448550670635L;
+    private static final String RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY = "events";
+    private boolean enableEventsResending;
+
+    @Override
+    public void init(ServletConfig config) throws ServletException {
+        super.init(config);
+
+        String eventsResendingParameter = config.getInitParameter(RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY);
+        if ("true".equals(eventsResendingParameter)) {
+            log.debug("Events resending enabled");
+            enableEventsResending = true;
+        }
+    }
 
     @Override
     protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@@ -72,7 +86,7 @@ public class CamelWebSocketServlet extends CamelHttpTransportServlet {
         }
         
         log.debug("Dispatching to Websocket Consumer at {}", consumer.getPath());
-        ((WebsocketConsumer)consumer).service(request, response);
+        ((WebsocketConsumer)consumer).service(request, response, enableEventsResending);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
index 9b21354..b85b039 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
@@ -23,7 +23,12 @@ public final class WebsocketConstants {
     
     public static final String CONNECTION_KEY = "websocket.connectionKey";
     public static final String SEND_TO_ALL = "websocket.sendToAll";
-    
+    public static final String EVENT_TYPE = "websocket.eventType";
+
+    public static final int ONOPEN_EVENT_TYPE = 1;
+    public static final int ONCLOSE_EVENT_TYPE = 0;
+    public static final int ONERROR_EVENT_TYPE = -1;
+
     private WebsocketConstants() {
         //helper class
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 22beae5..934fa5f 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.atmosphere.websocket;
 
 import java.io.IOException;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -28,22 +27,17 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.servlet.ServletConsumer;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequest;
 import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResponse;
 import org.atmosphere.cpr.AtmosphereResponseImpl;
 import org.atmosphere.websocket.WebSocketProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public class WebsocketConsumer extends ServletConsumer {
-    private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketConsumer.class);
-    
     private AtmosphereFramework framework;
-    
+    private boolean enableEventsResending;
+
     public WebsocketConsumer(WebsocketEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.framework = new AtmosphereFramework(false, true);
@@ -58,8 +52,7 @@ public class WebsocketConsumer extends ServletConsumer {
         if (wsp instanceof WebsocketHandler) {
             ((WebsocketHandler)wsp).setConsumer(this);            
         } else {
-            // this should not normally happen
-            LOG.error("unexpected WebSocketHandler: {}", wsp);
+            throw new IllegalArgumentException("Unexpected WebSocketHandler: " + wsp);
         }
     }
 
@@ -68,8 +61,9 @@ public class WebsocketConsumer extends ServletConsumer {
         return (WebsocketEndpoint)super.getEndpoint();
     }
     
-    void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+    void service(HttpServletRequest request, HttpServletResponse response, boolean enableEventsResending) throws IOException, ServletException {
         framework.doCometSupport(AtmosphereRequestImpl.wrap(request), AtmosphereResponseImpl.wrap(response));
+        this.enableEventsResending = enableEventsResending;
     }
 
     public void sendMessage(final String connectionKey, Object message) {
@@ -88,4 +82,25 @@ public class WebsocketConsumer extends ServletConsumer {
             }
         });
     }
+
+    public void sendEventNotification(String connectionKey, int eventType) {
+        final Exchange exchange = getEndpoint().createExchange();
+
+        // set header
+        exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey);
+        exchange.getIn().setHeader(WebsocketConstants.EVENT_TYPE, eventType);
+
+        // send exchange using the async routing engine
+        getAsyncProcessor().process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+                }
+            }
+        });
+    }
+
+    public boolean isEnableEventsResending() {
+        return enableEventsResending;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
index 09d803d..fd5f836 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
@@ -41,14 +41,17 @@ public class WebsocketHandler implements WebSocketProtocol {
     @Override
     public void onClose(WebSocket webSocket) {
         LOG.debug("closing websocket");
+        String connectionKey = store.getConnectionKey(webSocket);
+        sendEventNotification(connectionKey, WebsocketConstants.ONCLOSE_EVENT_TYPE);
         store.removeWebSocket(webSocket);
-        
         LOG.debug("websocket closed");
     }
 
     @Override
     public void onError(WebSocket webSocket, WebSocketException t) {
         LOG.error("websocket on error", t);
+        String connectionKey = store.getConnectionKey(webSocket);
+        sendEventNotification(connectionKey, WebsocketConstants.ONERROR_EVENT_TYPE);
     }
 
     @Override
@@ -56,6 +59,7 @@ public class WebsocketHandler implements WebSocketProtocol {
         LOG.debug("opening websocket");
         String connectionKey = UUID.randomUUID().toString();
         store.addWebSocket(connectionKey, webSocket);
+        sendEventNotification(connectionKey, WebsocketConstants.ONOPEN_EVENT_TYPE);
         LOG.debug("websocket opened");
     }
 
@@ -89,4 +93,9 @@ public class WebsocketHandler implements WebSocketProtocol {
         this.store = consumer.getEndpoint().getWebSocketStore();
     }
 
+    private void sendEventNotification(final String connectionKey, final int eventType) {
+        if (consumer.isEnableEventsResending()) {
+            consumer.sendEventNotification(connectionKey, eventType);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
index 569354f..95a23bc 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
@@ -34,6 +34,8 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport {
     
     protected Server server;
 
+    protected ServletHolder servletHolder;
+
     @Before
     public void setUp() throws Exception {
         server = new Server();
@@ -46,7 +48,7 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport {
         context.setContextPath("/");
         server.setHandler(context);
 
-        ServletHolder servletHolder = new ServletHolder(new CamelWebSocketServlet());
+        servletHolder = new ServletHolder(new CamelWebSocketServlet());
         servletHolder.setName("CamelWsServlet");
         context.addServlet(servletHolder, "/*");
         

http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
index f48c651..dd4063b 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
@@ -116,6 +116,23 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
         wsclient2.close();
     }
 
+    @Test
+    public void testWebsocketEventsResendingEnabled() throws Exception {
+        servletHolder.setInitParameter("events", "true");
+
+        TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola4");
+        wsclient.connect();
+        wsclient.close();
+    }
+
+    @Test
+    public void testWebsocketEventsResendingDisabled() throws Exception {
+        TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola5");
+        wsclient.connect();
+        assertFalse(wsclient.await(10));
+        wsclient.close();
+    }
+
     // START SNIPPET: payload
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
@@ -140,7 +157,20 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
                         createResponse(exchange, true);
                     }
                 }).to("atmosphere-websocket:///hola3");
-                
+
+                // route for events resending enabled
+                from("atmosphere-websocket:///hola4").to("log:info").process(new Processor() {
+                    public void process(final Exchange exchange) throws Exception {
+                        checkEventsResendingEnabled(exchange);
+                    }
+                });
+
+                // route for events resending disabled
+                from("atmosphere-websocket:///hola5").to("log:info").process(new Processor() {
+                    public void process(final Exchange exchange) throws Exception {
+                        checkEventsResendingDisabled(exchange);
+                    }
+                }).to("atmosphere-websocket:///hola5");
             }
         };
     }
@@ -163,7 +193,30 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
             exchange.getIn().setBody(createByteResponse(readAll((InputStream)msg)));
         }
     }
-    
+
+    private static void checkEventsResendingEnabled(Exchange exchange) {
+        Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
+        Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);
+        Object msg = exchange.getIn().getBody();
+
+        assertEquals(null, msg);
+        assertTrue(connectionKey != null);
+
+        if (eventType instanceof Integer) {
+            assertTrue(eventType.equals(1) || eventType.equals(0) || eventType.equals(-1));
+        }
+    }
+
+    private static void checkEventsResendingDisabled(Exchange exchange) {
+        Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);
+
+        if (eventType instanceof Integer) {
+            if (eventType.equals(1) || eventType.equals(0) || eventType.equals(-1)) {
+                exchange.getIn().setBody("Error. This place should never be reached.");
+            }
+        }
+    }
+
     private static byte[] createByteResponse(byte[] req) {
         byte[] resp = new byte[req.length + RESPONSE_GREETING_BYTES.length];
         System.arraycopy(RESPONSE_GREETING_BYTES, 0, resp, 0, RESPONSE_GREETING_BYTES.length);