You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/26 11:30:29 UTC
camel git commit: [CAMEL-9364] Add ability to receive
onOpen/onClose/onError websocket events through camel rout.
Repository: camel
Updated Branches:
refs/heads/master e5c130b3e -> 36297987b
[CAMEL-9364] Add ability to receive onOpen/onClose/onError websocket events through camel rout.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36297987
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36297987
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36297987
Branch: refs/heads/master
Commit: 36297987bb152beacffcfc5e9ae1ee4a050752a3
Parents: e5c130b
Author: Pavlo Kletsko <pk...@gmail.com>
Authored: Thu Nov 26 10:30:04 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Nov 26 11:29:16 2015 +0100
----------------------------------------------------------------------
.../websocket/CamelWebSocketServlet.java | 16 +++++-
.../websocket/WebsocketConstants.java | 7 ++-
.../atmosphere/websocket/WebsocketConsumer.java | 37 +++++++++----
.../atmosphere/websocket/WebsocketHandler.java | 11 +++-
.../WebsocketCamelRouterTestSupport.java | 4 +-
.../websocket/WebsocketRouteTest.java | 57 +++++++++++++++++++-
6 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
index 07c9779..6c9be21 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/CamelWebSocketServlet.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.atmosphere.websocket;
import java.io.IOException;
+import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -34,6 +35,19 @@ import org.apache.camel.http.common.HttpConsumer;
*/
public class CamelWebSocketServlet extends CamelHttpTransportServlet {
private static final long serialVersionUID = 1764707448550670635L;
+ private static final String RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY = "events";
+ private boolean enableEventsResending;
+
+ @Override
+ public void init(ServletConfig config) throws ServletException {
+ super.init(config);
+
+ String eventsResendingParameter = config.getInitParameter(RESEND_ALL_WEBSOCKET_EVENTS_PARAM_KEY);
+ if ("true".equals(eventsResendingParameter)) {
+ log.debug("Events resending enabled");
+ enableEventsResending = true;
+ }
+ }
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@@ -72,7 +86,7 @@ public class CamelWebSocketServlet extends CamelHttpTransportServlet {
}
log.debug("Dispatching to Websocket Consumer at {}", consumer.getPath());
- ((WebsocketConsumer)consumer).service(request, response);
+ ((WebsocketConsumer)consumer).service(request, response, enableEventsResending);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
index 9b21354..b85b039 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
@@ -23,7 +23,12 @@ public final class WebsocketConstants {
public static final String CONNECTION_KEY = "websocket.connectionKey";
public static final String SEND_TO_ALL = "websocket.sendToAll";
-
+ public static final String EVENT_TYPE = "websocket.eventType";
+
+ public static final int ONOPEN_EVENT_TYPE = 1;
+ public static final int ONCLOSE_EVENT_TYPE = 0;
+ public static final int ONERROR_EVENT_TYPE = -1;
+
private WebsocketConstants() {
//helper class
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 22beae5..934fa5f 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.atmosphere.websocket;
import java.io.IOException;
-
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -28,22 +27,17 @@ import org.apache.camel.Processor;
import org.apache.camel.component.servlet.ServletConsumer;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
-import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereRequestImpl;
-import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.AtmosphereResponseImpl;
import org.atmosphere.websocket.WebSocketProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*
*/
public class WebsocketConsumer extends ServletConsumer {
- private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketConsumer.class);
-
private AtmosphereFramework framework;
-
+ private boolean enableEventsResending;
+
public WebsocketConsumer(WebsocketEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.framework = new AtmosphereFramework(false, true);
@@ -58,8 +52,7 @@ public class WebsocketConsumer extends ServletConsumer {
if (wsp instanceof WebsocketHandler) {
((WebsocketHandler)wsp).setConsumer(this);
} else {
- // this should not normally happen
- LOG.error("unexpected WebSocketHandler: {}", wsp);
+ throw new IllegalArgumentException("Unexpected WebSocketHandler: " + wsp);
}
}
@@ -68,8 +61,9 @@ public class WebsocketConsumer extends ServletConsumer {
return (WebsocketEndpoint)super.getEndpoint();
}
- void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ void service(HttpServletRequest request, HttpServletResponse response, boolean enableEventsResending) throws IOException, ServletException {
framework.doCometSupport(AtmosphereRequestImpl.wrap(request), AtmosphereResponseImpl.wrap(response));
+ this.enableEventsResending = enableEventsResending;
}
public void sendMessage(final String connectionKey, Object message) {
@@ -88,4 +82,25 @@ public class WebsocketConsumer extends ServletConsumer {
}
});
}
+
+ public void sendEventNotification(String connectionKey, int eventType) {
+ final Exchange exchange = getEndpoint().createExchange();
+
+ // set header
+ exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey);
+ exchange.getIn().setHeader(WebsocketConstants.EVENT_TYPE, eventType);
+
+ // send exchange using the async routing engine
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+ }
+ }
+ });
+ }
+
+ public boolean isEnableEventsResending() {
+ return enableEventsResending;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
index 09d803d..fd5f836 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketHandler.java
@@ -41,14 +41,17 @@ public class WebsocketHandler implements WebSocketProtocol {
@Override
public void onClose(WebSocket webSocket) {
LOG.debug("closing websocket");
+ String connectionKey = store.getConnectionKey(webSocket);
+ sendEventNotification(connectionKey, WebsocketConstants.ONCLOSE_EVENT_TYPE);
store.removeWebSocket(webSocket);
-
LOG.debug("websocket closed");
}
@Override
public void onError(WebSocket webSocket, WebSocketException t) {
LOG.error("websocket on error", t);
+ String connectionKey = store.getConnectionKey(webSocket);
+ sendEventNotification(connectionKey, WebsocketConstants.ONERROR_EVENT_TYPE);
}
@Override
@@ -56,6 +59,7 @@ public class WebsocketHandler implements WebSocketProtocol {
LOG.debug("opening websocket");
String connectionKey = UUID.randomUUID().toString();
store.addWebSocket(connectionKey, webSocket);
+ sendEventNotification(connectionKey, WebsocketConstants.ONOPEN_EVENT_TYPE);
LOG.debug("websocket opened");
}
@@ -89,4 +93,9 @@ public class WebsocketHandler implements WebSocketProtocol {
this.store = consumer.getEndpoint().getWebSocketStore();
}
+ private void sendEventNotification(final String connectionKey, final int eventType) {
+ if (consumer.isEnableEventsResending()) {
+ consumer.sendEventNotification(connectionKey, eventType);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
index 569354f..95a23bc 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketCamelRouterTestSupport.java
@@ -34,6 +34,8 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport {
protected Server server;
+ protected ServletHolder servletHolder;
+
@Before
public void setUp() throws Exception {
server = new Server();
@@ -46,7 +48,7 @@ public class WebsocketCamelRouterTestSupport extends CamelTestSupport {
context.setContextPath("/");
server.setHandler(context);
- ServletHolder servletHolder = new ServletHolder(new CamelWebSocketServlet());
+ servletHolder = new ServletHolder(new CamelWebSocketServlet());
servletHolder.setName("CamelWsServlet");
context.addServlet(servletHolder, "/*");
http://git-wip-us.apache.org/repos/asf/camel/blob/36297987/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
index f48c651..dd4063b 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteTest.java
@@ -116,6 +116,23 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
wsclient2.close();
}
+ @Test
+ public void testWebsocketEventsResendingEnabled() throws Exception {
+ servletHolder.setInitParameter("events", "true");
+
+ TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola4");
+ wsclient.connect();
+ wsclient.close();
+ }
+
+ @Test
+ public void testWebsocketEventsResendingDisabled() throws Exception {
+ TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola5");
+ wsclient.connect();
+ assertFalse(wsclient.await(10));
+ wsclient.close();
+ }
+
// START SNIPPET: payload
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -140,7 +157,20 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
createResponse(exchange, true);
}
}).to("atmosphere-websocket:///hola3");
-
+
+ // route for events resending enabled
+ from("atmosphere-websocket:///hola4").to("log:info").process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ checkEventsResendingEnabled(exchange);
+ }
+ });
+
+ // route for events resending disabled
+ from("atmosphere-websocket:///hola5").to("log:info").process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ checkEventsResendingDisabled(exchange);
+ }
+ }).to("atmosphere-websocket:///hola5");
}
};
}
@@ -163,7 +193,30 @@ public class WebsocketRouteTest extends WebsocketCamelRouterTestSupport {
exchange.getIn().setBody(createByteResponse(readAll((InputStream)msg)));
}
}
-
+
+ private static void checkEventsResendingEnabled(Exchange exchange) {
+ Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
+ Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);
+ Object msg = exchange.getIn().getBody();
+
+ assertEquals(null, msg);
+ assertTrue(connectionKey != null);
+
+ if (eventType instanceof Integer) {
+ assertTrue(eventType.equals(1) || eventType.equals(0) || eventType.equals(-1));
+ }
+ }
+
+ private static void checkEventsResendingDisabled(Exchange exchange) {
+ Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);
+
+ if (eventType instanceof Integer) {
+ if (eventType.equals(1) || eventType.equals(0) || eventType.equals(-1)) {
+ exchange.getIn().setBody("Error. This place should never be reached.");
+ }
+ }
+ }
+
private static byte[] createByteResponse(byte[] req) {
byte[] resp = new byte[req.length + RESPONSE_GREETING_BYTES.length];
System.arraycopy(RESPONSE_GREETING_BYTES, 0, resp, 0, RESPONSE_GREETING_BYTES.length);