You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/05 14:33:45 UTC
[3/3] camel git commit: [CAMEL-9393] Add ability to send a message to
multiple defined connections with guaranty of delivery
[CAMEL-9393] Add ability to send a message to multiple defined connections with guaranty of delivery
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28831913
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28831913
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28831913
Branch: refs/heads/master
Commit: 28831913fe7b3cff6e075dc08ccddc37eeea6c42
Parents: 1d762e5
Author: Pavlo Kletsko <pk...@gmail.com>
Authored: Sat Dec 5 13:44:05 2015 +0100
Committer: Pavlo Kletsko <pk...@gmail.com>
Committed: Sat Dec 5 13:44:05 2015 +0100
----------------------------------------------------------------------
.../websocket/WebsocketConstants.java | 4 +
.../atmosphere/websocket/WebsocketConsumer.java | 19 ++
.../atmosphere/websocket/WebsocketEndpoint.java | 8 +-
.../atmosphere/websocket/WebsocketProducer.java | 88 +++++---
.../WebsocketRouteWithInitParamTest.java | 206 ++++++++++++++++++-
5 files changed, 295 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
index b85b039..c95da88 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
@@ -22,13 +22,17 @@ package org.apache.camel.component.atmosphere.websocket;
public final class WebsocketConstants {
public static final String CONNECTION_KEY = "websocket.connectionKey";
+ public static final String CONNECTION_KEY_LIST = "websocket.connectionKey.list";
public static final String SEND_TO_ALL = "websocket.sendToAll";
public static final String EVENT_TYPE = "websocket.eventType";
+ public static final String ERROR_TYPE = "websocket.errorType";
public static final int ONOPEN_EVENT_TYPE = 1;
public static final int ONCLOSE_EVENT_TYPE = 0;
public static final int ONERROR_EVENT_TYPE = -1;
+ public static final int MESSAGE_NOT_SENT_ERROR_TYPE = 1;
+
private WebsocketConstants() {
//helper class
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 86bd016..5bbda28 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.atmosphere.websocket;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -108,6 +109,24 @@ public class WebsocketConsumer extends ServletConsumer {
});
}
+ public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) {
+ final Exchange exchange = getEndpoint().createExchange();
+
+ // set header and body
+ exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, failedConnectionKeys);
+ exchange.getIn().setHeader(WebsocketConstants.ERROR_TYPE, WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE);
+ exchange.getIn().setBody(message);
+
+ // send exchange using the async routing engine
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+ }
+ }
+ });
+ }
+
public boolean isEnableEventsResending() {
return enableEventsResending;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
index d8d803c..cdb7ff2 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.UriPath;
public class WebsocketEndpoint extends ServletEndpoint {
private WebSocketStore store;
+ private WebsocketConsumer websocketConsumer;
@UriPath(description = "Name of websocket endpoint") @Metadata(required = "true")
private String servicePath;
@@ -62,7 +63,8 @@ public class WebsocketEndpoint extends ServletEndpoint {
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new WebsocketConsumer(this, processor);
+ websocketConsumer = new WebsocketConsumer(this, processor);
+ return websocketConsumer;
}
@Override
@@ -95,4 +97,8 @@ public class WebsocketEndpoint extends ServletEndpoint {
WebSocketStore getWebSocketStore() {
return store;
}
+
+ public WebsocketConsumer getWebsocketConsumer() {
+ return websocketConsumer;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
index 7fda043..9501537 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
@@ -18,6 +18,9 @@ package org.apache.camel.component.atmosphere.websocket;
import java.io.InputStream;
import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -34,6 +37,8 @@ import org.slf4j.LoggerFactory;
public class WebsocketProducer extends DefaultProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketProducer.class);
+ private List<String> notValidConnectionKeys = new ArrayList<>();
+
private static ExecutorService executor = Executors.newSingleThreadExecutor();
public WebsocketProducer(WebsocketEndpoint endpoint) {
@@ -71,46 +76,73 @@ public class WebsocketProducer extends DefaultProducer {
} else if (message instanceof InputStream) {
message = in.getBody(byte[].class);
}
-
+
log.debug("Sending to {}", message);
if (getEndpoint().isSendToAll()) {
log.debug("Sending to all -> {}", message);
//TODO consider using atmosphere's broadcast or a more configurable async send
for (final WebSocket websocket : getEndpoint().getWebSocketStore().getAllWebSockets()) {
- final Object msg = message;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- sendMessage(websocket, msg);
- }
- });
+ sendMessage(websocket, message);
}
+ } else if (in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST) != null) {
+ List<String> connectionKeyList = in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class);
+ messageDistributor(connectionKeyList, message);
} else {
- // look for connection key and get Websocket
String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
- if (connectionKey != null) {
- WebSocket websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey);
- log.debug("Sending to connection key {} -> {}", connectionKey, message);
- sendMessage(websocket, message);
- } else {
- throw new IllegalArgumentException("Failed to send message to single connection; connetion key not set.");
- }
-
+ messageDistributor(Arrays.asList(connectionKey), message);
+ }
+ }
+
+ private void messageDistributor(final List<String> connectionKeyList, final Object message) {
+ if (connectionKeyList == null) {
+ throw new IllegalArgumentException("Failed to send message to multiple connections; connetion key list is not set.");
+ }
+
+ notValidConnectionKeys = new ArrayList<>();
+
+ for (final String connectionKey : connectionKeyList) {
+ log.debug("Sending to connection key {} -> {}", connectionKey, message);
+ sendMessage(getWebSocket(connectionKey), message);
+ }
+
+ if (!notValidConnectionKeys.isEmpty()) {
+ log.debug("Some connections have not received the message {}", message);
+ getEndpoint().getWebsocketConsumer().sendNotDeliveredMessage(notValidConnectionKeys, message);
}
}
- private void sendMessage(WebSocket websocket, Object message) {
- try {
- if (message instanceof String) {
- websocket.write((String)message);
- } else if (message instanceof byte[]) {
- websocket.write((byte[])message, 0, ((byte[])message).length);
- } else {
- // this should not happen unless one of the supported types is missing above.
- LOG.error("unexpected message type {}", message == null ? null : message.getClass());
+ private void sendMessage(final WebSocket websocket, final Object message) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (message instanceof String) {
+ websocket.write((String) message);
+ } else if (message instanceof byte[]) {
+ websocket.write((byte[]) message, 0, ((byte[]) message).length);
+ } else {
+ // this should not happen unless one of the supported types is missing above.
+ LOG.error("unexpected message type {}", message == null ? null : message.getClass());
+ }
+ } catch (Exception e) {
+ LOG.error("Error when writing to websocket", e);
+ }
+ }
+ });
+ }
+
+ private WebSocket getWebSocket(final String connectionKey) {
+ WebSocket websocket;
+ if (connectionKey == null) {
+ throw new IllegalArgumentException("Failed to send message to single connection; connetion key is not set.");
+ } else {
+ websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey);
+ if (websocket == null) {
+ //collect for call back to handle not sent message(s) to guaranty delivery
+ notValidConnectionKeys.add(connectionKey);
+ log.debug("Failed to send message to single connection; connetion key is not valid. {}", connectionKey);
}
- } catch (Exception e) {
- LOG.error("Error when writing to websocket", e);
}
+ return websocket;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
index f0f8182..1407cbb 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
@@ -21,8 +21,17 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithInitParamTestSupport {
+ private static final String[] EXISTED_USERS = {"Kim", "Pavlo", "Peter"};
+ private static String[] BROADCAST_MESSAGE_TO = {};
+ private static Map<String,String> connectionKeyUserMap = new HashMap<>();
+
@Test
public void testWebsocketEventsResendingEnabled() throws Exception {
TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola");
@@ -37,6 +46,99 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni
wsclient.close();
}
+ @Test
+ public void testWebsocketSingleClientBroadcastMultipleClients() throws Exception {
+ final int AWAIT_TIME = 5;
+ connectionKeyUserMap.clear();
+
+ TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+ TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+ TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+
+ wsclient1.connect();
+ wsclient1.await(AWAIT_TIME);
+
+ wsclient2.connect();
+ wsclient2.await(AWAIT_TIME);
+
+ wsclient3.connect();
+ wsclient3.await(AWAIT_TIME);
+
+ //all connections were registered in external store
+ assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length);
+
+ BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]};
+
+ wsclient1.sendTextMessage("Gambas");
+ wsclient1.await(AWAIT_TIME);
+
+ List<String> received1 = wsclient1.getReceived(String.class);
+ assertEquals(1, received1.size());
+
+ for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+ assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+ }
+
+ List<String> received2 = wsclient2.getReceived(String.class);
+ assertEquals(1, received2.size());
+ for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+ assertTrue(received2.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+ }
+
+ List<String> received3 = wsclient3.getReceived(String.class);
+ assertEquals(0, received3.size());
+
+ wsclient1.close();
+ wsclient2.close();
+ wsclient3.close();
+ }
+
+ @Test
+ public void testWebsocketSingleClientBroadcastMultipleClientsGuaranteeDelivery() throws Exception {
+ final int AWAIT_TIME = 5;
+ connectionKeyUserMap.clear();
+
+ TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+ TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+ TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+
+ wsclient1.connect();
+ wsclient1.await(AWAIT_TIME);
+
+ wsclient2.connect();
+ wsclient2.await(AWAIT_TIME);
+
+ wsclient3.connect();
+ wsclient3.await(AWAIT_TIME);
+
+ //all connections were registered in external store
+ assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length);
+
+ wsclient2.close();
+ wsclient2.await(AWAIT_TIME);
+
+ BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]};
+
+ wsclient1.sendTextMessage("Gambas");
+ wsclient1.await(AWAIT_TIME);
+
+ List<String> received1 = wsclient1.getReceived(String.class);
+ assertEquals(1, received1.size());
+
+ for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+ assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+ }
+
+ List<String> received2 = wsclient2.getReceived(String.class);
+ assertEquals(0, received2.size());
+
+ List<String> received3 = wsclient3.getReceived(String.class);
+ assertEquals(0, received3.size());
+
+ wsclient1.close();
+ wsclient3.close();
+ }
+
// START SNIPPET: payload
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -48,16 +150,118 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni
}
});
- // route for events resending enabled with parameters from url
+ // route for events resending enabled
from("atmosphere-websocket:///hola1").to("log:info").process(new Processor() {
public void process(final Exchange exchange) throws Exception {
checkPassedParameters(exchange);
}
});
+
+ // route for single client broadcast to multiple clients
+ from("atmosphere-websocket:///hola2").to("log:info")
+ .choice()
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ createExternalConnectionRegister(exchange);
+ }
+ })
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ removeExternalConnectionRegister(exchange);
+ }
+ })
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ removeExternalConnectionRegister(exchange);
+ }
+ })
+ .otherwise()
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ createBroadcastMultipleClientsResponse(exchange);
+ }
+ }).to("atmosphere-websocket:///hola2");
+
+ // route for single client broadcast to multiple clients guarantee delivery
+ from("atmosphere-websocket:///hola3").to("log:info")
+ .choice()
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ createExternalConnectionRegister(exchange);
+ }
+ })
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ removeExternalConnectionRegister(exchange);
+ }
+ })
+ .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ removeExternalConnectionRegister(exchange);
+ }
+ })
+ .when(header(WebsocketConstants.ERROR_TYPE).isEqualTo(WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE))
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ handleNotDeliveredMessage(exchange);
+ }
+ })
+ .otherwise()
+ .process(new Processor() {
+ public void process(final Exchange exchange) throws Exception {
+ createBroadcastMultipleClientsResponse(exchange);
+ }
+ }).to("atmosphere-websocket:///hola3");
}
};
}
+ private static void handleNotDeliveredMessage(Exchange exchange) {
+ List<String> connectionKeyList = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class);
+ assertEquals(1, connectionKeyList.size());
+ assertEquals(connectionKeyList.get(0), connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[1]));
+ }
+
+ private static void createExternalConnectionRegister(Exchange exchange) {
+ Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
+
+ String userName = EXISTED_USERS[0];
+
+ if (connectionKeyUserMap.size() > 0) {
+ userName = EXISTED_USERS[connectionKeyUserMap.size()];
+ }
+
+ connectionKeyUserMap.put(userName, (String) connectionKey);
+ }
+
+ private static void removeExternalConnectionRegister(Exchange exchange) {
+ // remove connectionKey from external store
+ }
+
+ private static void createBroadcastMultipleClientsResponse(Exchange exchange) {
+ List<String> connectionKeyList = new ArrayList<>();
+ Object msg = exchange.getIn().getBody();
+
+ String additionalMessage = "";
+
+ //send the message only to selected connections
+ for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+ connectionKeyList.add(connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[i]));
+ additionalMessage += BROADCAST_MESSAGE_TO[i] + " ";
+ }
+
+ additionalMessage += " Received the message: ";
+
+ exchange.getIn().setBody(additionalMessage + msg);
+ exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, connectionKeyList);
+ }
+
private static void checkEventsResendingEnabled(Exchange exchange) {
Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);