You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/05 14:33:45 UTC

[3/3] camel git commit: [CAMEL-9393] Add ability to send a message to multiple defined connections with guaranty of delivery

[CAMEL-9393] Add ability to send a message to multiple defined connections with guaranty of delivery


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28831913
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28831913
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28831913

Branch: refs/heads/master
Commit: 28831913fe7b3cff6e075dc08ccddc37eeea6c42
Parents: 1d762e5
Author: Pavlo Kletsko <pk...@gmail.com>
Authored: Sat Dec 5 13:44:05 2015 +0100
Committer: Pavlo Kletsko <pk...@gmail.com>
Committed: Sat Dec 5 13:44:05 2015 +0100

----------------------------------------------------------------------
 .../websocket/WebsocketConstants.java           |   4 +
 .../atmosphere/websocket/WebsocketConsumer.java |  19 ++
 .../atmosphere/websocket/WebsocketEndpoint.java |   8 +-
 .../atmosphere/websocket/WebsocketProducer.java |  88 +++++---
 .../WebsocketRouteWithInitParamTest.java        | 206 ++++++++++++++++++-
 5 files changed, 295 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
index b85b039..c95da88 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java
@@ -22,13 +22,17 @@ package org.apache.camel.component.atmosphere.websocket;
 public final class WebsocketConstants {
     
     public static final String CONNECTION_KEY = "websocket.connectionKey";
+    public static final String CONNECTION_KEY_LIST = "websocket.connectionKey.list";
     public static final String SEND_TO_ALL = "websocket.sendToAll";
     public static final String EVENT_TYPE = "websocket.eventType";
+    public static final String ERROR_TYPE = "websocket.errorType";
 
     public static final int ONOPEN_EVENT_TYPE = 1;
     public static final int ONCLOSE_EVENT_TYPE = 0;
     public static final int ONERROR_EVENT_TYPE = -1;
 
+    public static final int MESSAGE_NOT_SENT_ERROR_TYPE = 1;
+
     private WebsocketConstants() {
         //helper class
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 86bd016..5bbda28 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.atmosphere.websocket;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -108,6 +109,24 @@ public class WebsocketConsumer extends ServletConsumer {
         });
     }
 
+    public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) {
+        final Exchange exchange = getEndpoint().createExchange();
+
+        // set header and body
+        exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, failedConnectionKeys);
+        exchange.getIn().setHeader(WebsocketConstants.ERROR_TYPE, WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE);
+        exchange.getIn().setBody(message);
+
+        // send exchange using the async routing engine
+        getAsyncProcessor().process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+                }
+            }
+        });
+    }
+
     public boolean isEnableEventsResending() {
         return enableEventsResending;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
index d8d803c..cdb7ff2 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java
@@ -36,6 +36,7 @@ import org.apache.camel.spi.UriPath;
 public class WebsocketEndpoint extends ServletEndpoint {
 
     private WebSocketStore store;
+    private WebsocketConsumer websocketConsumer;
 
     @UriPath(description = "Name of websocket endpoint") @Metadata(required = "true")
     private String servicePath;
@@ -62,7 +63,8 @@ public class WebsocketEndpoint extends ServletEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new WebsocketConsumer(this, processor);
+        websocketConsumer = new WebsocketConsumer(this, processor);
+        return websocketConsumer;
     }
 
     @Override
@@ -95,4 +97,8 @@ public class WebsocketEndpoint extends ServletEndpoint {
     WebSocketStore getWebSocketStore() {
         return store;
     }
+
+    public WebsocketConsumer getWebsocketConsumer() {
+        return websocketConsumer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
index 7fda043..9501537 100644
--- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
+++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
@@ -18,6 +18,9 @@ package org.apache.camel.component.atmosphere.websocket;
 
 import java.io.InputStream;
 import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -34,6 +37,8 @@ import org.slf4j.LoggerFactory;
 public class WebsocketProducer extends DefaultProducer {
     private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketProducer.class);
 
+    private List<String> notValidConnectionKeys = new ArrayList<>();
+
     private static ExecutorService executor = Executors.newSingleThreadExecutor();
     
     public WebsocketProducer(WebsocketEndpoint endpoint) {
@@ -71,46 +76,73 @@ public class WebsocketProducer extends DefaultProducer {
         } else if (message instanceof InputStream) {
             message = in.getBody(byte[].class);
         }
-        
+
         log.debug("Sending to {}", message);
         if (getEndpoint().isSendToAll()) {
             log.debug("Sending to all -> {}", message);
             //TODO consider using atmosphere's broadcast or a more configurable async send
             for (final WebSocket websocket : getEndpoint().getWebSocketStore().getAllWebSockets()) {
-                final Object msg = message;
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        sendMessage(websocket, msg);
-                    }
-                });
+                sendMessage(websocket, message);
             }
+        } else if (in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST) != null) {
+            List<String> connectionKeyList = in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class);
+            messageDistributor(connectionKeyList, message);
         } else {
-            // look for connection key and get Websocket
             String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
-            if (connectionKey != null) {
-                WebSocket websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey);
-                log.debug("Sending to connection key {} -> {}", connectionKey, message);
-                sendMessage(websocket, message);
-            } else {
-                throw new IllegalArgumentException("Failed to send message to single connection; connetion key not set.");
-            }
-            
+            messageDistributor(Arrays.asList(connectionKey), message);
+        }
+    }
+
+    private void messageDistributor(final List<String> connectionKeyList, final Object message) {
+        if (connectionKeyList == null) {
+            throw new IllegalArgumentException("Failed to send message to multiple connections; connetion key list is not set.");
+        }
+
+        notValidConnectionKeys = new ArrayList<>();
+
+        for (final String connectionKey : connectionKeyList) {
+            log.debug("Sending to connection key {} -> {}", connectionKey, message);
+            sendMessage(getWebSocket(connectionKey), message);
+        }
+
+        if (!notValidConnectionKeys.isEmpty()) {
+            log.debug("Some connections have not received the message {}",  message);
+            getEndpoint().getWebsocketConsumer().sendNotDeliveredMessage(notValidConnectionKeys, message);
         }
     }
 
-    private void sendMessage(WebSocket websocket, Object message) {
-        try {
-            if (message instanceof String) {
-                websocket.write((String)message);
-            } else if (message instanceof byte[]) {
-                websocket.write((byte[])message, 0, ((byte[])message).length);
-            } else {
-                // this should not happen unless one of the supported types is missing above.
-                LOG.error("unexpected message type {}", message == null ? null : message.getClass());
+    private void sendMessage(final WebSocket websocket, final Object message) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (message instanceof String) {
+                        websocket.write((String) message);
+                    } else if (message instanceof byte[]) {
+                        websocket.write((byte[]) message, 0, ((byte[]) message).length);
+                    } else {
+                        // this should not happen unless one of the supported types is missing above.
+                        LOG.error("unexpected message type {}", message == null ? null : message.getClass());
+                    }
+                } catch (Exception e) {
+                    LOG.error("Error when writing to websocket", e);
+                }
+            }
+        });
+    }
+
+    private WebSocket getWebSocket(final String connectionKey) {
+        WebSocket websocket;
+        if (connectionKey == null) {
+            throw new IllegalArgumentException("Failed to send message to single connection; connetion key is not set.");
+        } else {
+            websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey);
+            if (websocket == null) {
+                //collect for call back to handle not sent message(s) to guaranty delivery
+                notValidConnectionKeys.add(connectionKey);
+                log.debug("Failed to send message to single connection; connetion key is not valid. {}",  connectionKey);
             }
-        } catch (Exception e) {
-            LOG.error("Error when writing to websocket", e);
         }
+        return websocket;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
index f0f8182..1407cbb 100644
--- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
+++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java
@@ -21,8 +21,17 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithInitParamTestSupport {
 
+    private static final String[] EXISTED_USERS = {"Kim", "Pavlo", "Peter"};
+    private static String[] BROADCAST_MESSAGE_TO = {};
+    private static Map<String,String> connectionKeyUserMap = new HashMap<>();
+
     @Test
     public void testWebsocketEventsResendingEnabled() throws Exception {
         TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola");
@@ -37,6 +46,99 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni
         wsclient.close();
     }
 
+    @Test
+    public void testWebsocketSingleClientBroadcastMultipleClients() throws Exception {
+        final int AWAIT_TIME = 5;
+        connectionKeyUserMap.clear();
+
+        TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+        TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+        TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola2", 2);
+
+        wsclient1.connect();
+        wsclient1.await(AWAIT_TIME);
+
+        wsclient2.connect();
+        wsclient2.await(AWAIT_TIME);
+
+        wsclient3.connect();
+        wsclient3.await(AWAIT_TIME);
+
+        //all connections were registered in external store
+        assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length);
+
+        BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]};
+
+        wsclient1.sendTextMessage("Gambas");
+        wsclient1.await(AWAIT_TIME);
+
+        List<String> received1 = wsclient1.getReceived(String.class);
+        assertEquals(1, received1.size());
+
+        for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+            assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+        }
+
+        List<String> received2 = wsclient2.getReceived(String.class);
+        assertEquals(1, received2.size());
+        for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+            assertTrue(received2.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+        }
+
+        List<String> received3 = wsclient3.getReceived(String.class);
+        assertEquals(0, received3.size());
+
+        wsclient1.close();
+        wsclient2.close();
+        wsclient3.close();
+    }
+
+    @Test
+    public void testWebsocketSingleClientBroadcastMultipleClientsGuaranteeDelivery() throws Exception {
+        final int AWAIT_TIME = 5;
+        connectionKeyUserMap.clear();
+
+        TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+        TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+        TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola3", 2);
+
+        wsclient1.connect();
+        wsclient1.await(AWAIT_TIME);
+
+        wsclient2.connect();
+        wsclient2.await(AWAIT_TIME);
+
+        wsclient3.connect();
+        wsclient3.await(AWAIT_TIME);
+
+        //all connections were registered in external store
+        assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length);
+
+        wsclient2.close();
+        wsclient2.await(AWAIT_TIME);
+
+        BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]};
+
+        wsclient1.sendTextMessage("Gambas");
+        wsclient1.await(AWAIT_TIME);
+
+        List<String> received1 = wsclient1.getReceived(String.class);
+        assertEquals(1, received1.size());
+
+        for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+            assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i]));
+        }
+
+        List<String> received2 = wsclient2.getReceived(String.class);
+        assertEquals(0, received2.size());
+
+        List<String> received3 = wsclient3.getReceived(String.class);
+        assertEquals(0, received3.size());
+
+        wsclient1.close();
+        wsclient3.close();
+    }
+
     // START SNIPPET: payload
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
@@ -48,16 +150,118 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni
                     }
                 });
 
-                // route for events resending enabled with parameters from url
+                // route for events resending enabled
                 from("atmosphere-websocket:///hola1").to("log:info").process(new Processor() {
                     public void process(final Exchange exchange) throws Exception {
                         checkPassedParameters(exchange);
                     }
                 });
+
+                // route for single client broadcast to multiple clients
+                from("atmosphere-websocket:///hola2").to("log:info")
+                        .choice()
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                createExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                removeExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                removeExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .otherwise()
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                createBroadcastMultipleClientsResponse(exchange);
+                            }
+                        }).to("atmosphere-websocket:///hola2");
+
+                // route for single client broadcast to multiple clients guarantee delivery
+                from("atmosphere-websocket:///hola3").to("log:info")
+                        .choice()
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                createExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                removeExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                removeExternalConnectionRegister(exchange);
+                            }
+                        })
+                        .when(header(WebsocketConstants.ERROR_TYPE).isEqualTo(WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE))
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                handleNotDeliveredMessage(exchange);
+                            }
+                        })
+                        .otherwise()
+                        .process(new Processor() {
+                            public void process(final Exchange exchange) throws Exception {
+                                createBroadcastMultipleClientsResponse(exchange);
+                            }
+                        }).to("atmosphere-websocket:///hola3");
             }
         };
     }
 
+    private static void handleNotDeliveredMessage(Exchange exchange) {
+        List<String> connectionKeyList = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class);
+        assertEquals(1, connectionKeyList.size());
+        assertEquals(connectionKeyList.get(0), connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[1]));
+    }
+
+    private static void createExternalConnectionRegister(Exchange exchange) {
+        Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
+
+        String userName = EXISTED_USERS[0];
+
+        if (connectionKeyUserMap.size() > 0) {
+            userName = EXISTED_USERS[connectionKeyUserMap.size()];
+        }
+
+        connectionKeyUserMap.put(userName, (String) connectionKey);
+    }
+
+    private static void removeExternalConnectionRegister(Exchange exchange) {
+        // remove connectionKey from external store
+    }
+
+    private static void createBroadcastMultipleClientsResponse(Exchange exchange) {
+        List<String> connectionKeyList = new ArrayList<>();
+        Object msg = exchange.getIn().getBody();
+
+        String additionalMessage = "";
+
+        //send the message only to selected connections
+        for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) {
+            connectionKeyList.add(connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[i]));
+            additionalMessage += BROADCAST_MESSAGE_TO[i] + " ";
+        }
+
+        additionalMessage += " Received the message: ";
+
+        exchange.getIn().setBody(additionalMessage + msg);
+        exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, connectionKeyList);
+    }
+
     private static void checkEventsResendingEnabled(Exchange exchange) {
         Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY);
         Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);