You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/04 14:38:11 UTC

[1/2] camel git commit: CAMEL-10949 - websocket clients will get messages on which uri they are subscribed toif sendToAll set to true

Repository: camel
Updated Branches:
  refs/heads/camel-2.18.x 711b04817 -> 111b12d87
  refs/heads/master d82e585d9 -> a8a6b74c8


CAMEL-10949 - websocket clients will get messages on which uri they are subscribed toif sendToAll set to true


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a8a6b74c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a8a6b74c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a8a6b74c

Branch: refs/heads/master
Commit: a8a6b74c8037569170877aa1f66d867b8357bfa5
Parents: d82e585
Author: onders86 <on...@gmail.com>
Authored: Mon Apr 3 16:49:31 2017 +0300
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 4 16:34:50 2017 +0200

----------------------------------------------------------------------
 .../component/websocket/DefaultWebsocket.java   |  8 +++++-
 .../websocket/DefaultWebsocketFactory.java      |  4 +--
 .../websocket/MemoryWebsocketStore.java         | 19 +++++++++++--
 .../component/websocket/WebSocketFactory.java   |  2 +-
 .../component/websocket/WebsocketComponent.java |  4 +--
 .../websocket/WebsocketComponentServlet.java    | 10 +++++--
 .../component/websocket/WebsocketProducer.java  | 30 ++++++++++++++------
 .../websocket/DefaultWebsocketTest.java         |  4 +--
 .../websocket/MemoryWebsocketStoreTest.java     | 16 ++---------
 .../websocket/NodeSynchronizationImplTest.java  |  4 +--
 .../WebsocketComponentServletTest.java          |  2 +-
 11 files changed, 64 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
index 8cf91a1..05c596d 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
@@ -36,10 +36,12 @@ public class DefaultWebsocket implements Serializable {
     private final NodeSynchronization sync;
     private Session session;
     private String connectionKey;
+    private String pathSpec;
 
-    public DefaultWebsocket(NodeSynchronization sync, WebsocketConsumer consumer) {
+    public DefaultWebsocket(NodeSynchronization sync, String pathSpec, WebsocketConsumer consumer) {
         this.sync = sync;
         this.consumer = consumer;
+        this.pathSpec = pathSpec;
     }
 
     @OnWebSocketClose
@@ -82,6 +84,10 @@ public class DefaultWebsocket implements Serializable {
     public Session getSession() {
         return session;
     }
+    
+    public String getPathSpec() {
+        return pathSpec;
+    }
 
     public void setSession(Session session) {
         this.session = session;

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
index f022ac6..119563f 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
@@ -25,7 +25,7 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 public class DefaultWebsocketFactory implements WebSocketFactory {
 
     @Override
-    public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer) {
-        return new DefaultWebsocket(sync, consumer);
+    public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer) {
+        return new DefaultWebsocket(sync, pathSpec, consumer);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
index cdea5fd..d0a4d8b 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
@@ -25,12 +25,12 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
 
     @Override
     public void add(DefaultWebsocket ws) {
-        super.put(ws.getConnectionKey(), ws);
+        super.put(getKey(ws), ws);
     }
 
     @Override
     public void remove(DefaultWebsocket ws) {
-        super.remove(ws.getConnectionKey());
+        super.remove(getKey(ws));
     }
 
     @Override
@@ -57,4 +57,19 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
     public void stop() throws Exception {
         clear();
     }
+    
+    private String getKey(DefaultWebsocket ws) {
+        StringBuilder sb = new StringBuilder();
+        if (ws.getConnectionKey() == null && ws.getPathSpec() == null) {
+            return null;
+        } else {
+            if (ws.getConnectionKey() != null) {
+                sb.append(ws.getConnectionKey());
+            }
+            if (ws.getPathSpec() != null) {
+                sb.append(ws.getPathSpec());
+            }
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
index faea552..caa24c6 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
@@ -23,6 +23,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
  */
 public interface WebSocketFactory {
 
-    DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer);
+    DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index 07694e3..9b8c878 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -454,7 +454,7 @@ public class WebsocketComponent extends UriEndpointComponent {
     }
 
     protected WebsocketComponentServlet createServlet(NodeSynchronization sync, String pathSpec, Map<String, WebsocketComponentServlet> servlets, ServletContextHandler handler) {
-        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, socketFactory);
+        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, pathSpec, socketFactory);
         servlets.put(pathSpec, servlet);
         ServletHolder servletHolder = new ServletHolder(servlet);
         servletHolder.getInitParameters().putAll(handler.getInitParams());
@@ -548,7 +548,7 @@ public class WebsocketComponent extends UriEndpointComponent {
         return false;
     }
 
-    private static String createPathSpec(String remaining) {
+    public static String createPathSpec(String remaining) {
         // Is not correct as it does not support to add port in the URI
         //return String.format("/%s/*", remaining);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
index b49ba62..b5362f0 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
@@ -33,13 +33,15 @@ public class WebsocketComponentServlet extends WebSocketServlet {
 
     private final NodeSynchronization sync;
     private WebsocketConsumer consumer;
+    private String pathSpec;
 
     private ConcurrentMap<String, WebsocketConsumer> consumers = new ConcurrentHashMap<String, WebsocketConsumer>();
     private Map<String, WebSocketFactory> socketFactory;
 
-    public WebsocketComponentServlet(NodeSynchronization sync, Map<String, WebSocketFactory> socketFactory) {
+    public WebsocketComponentServlet(NodeSynchronization sync, String pathSpec, Map<String, WebSocketFactory> socketFactory) {
         this.sync = sync;
         this.socketFactory = socketFactory;
+        this.pathSpec = pathSpec;
     }
 
     public WebsocketConsumer getConsumer() {
@@ -69,7 +71,9 @@ public class WebsocketComponentServlet extends WebSocketServlet {
         }
 
         WebSocketFactory factory = socketFactory.get(protocolKey);
-        return factory.newInstance(request, protocolKey, sync, consumer);
+        return factory.newInstance(request, protocolKey, 
+                (consumer != null && consumer.getEndpoint() != null) ? WebsocketComponent.createPathSpec(consumer.getEndpoint().getResourceUri()) : null,
+                sync, consumer);
     }
 
     public Map<String, WebSocketFactory> getSocketFactory() {
@@ -87,7 +91,7 @@ public class WebsocketComponentServlet extends WebSocketServlet {
             public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
                 String protocolKey = "default";
                 WebSocketFactory factory = socketFactory.get(protocolKey);
-                return factory.newInstance(req, protocolKey, sync, consumer);
+                return factory.newInstance(req, protocolKey, pathSpec, sync, consumer);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 80d0618..3392134 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -54,7 +54,11 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
             // look for connection key and get Websocket
             String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
             if (connectionKey != null) {
-                DefaultWebsocket websocket = store.get(connectionKey);
+                String pathSpec = "";
+                if (endpoint.getResourceUri() != null) {
+                    pathSpec = WebsocketComponent.createPathSpec(endpoint.getResourceUri());
+                }
+                DefaultWebsocket websocket = store.get(connectionKey + pathSpec);
                 log.debug("Sending to connection key {} -> {}", connectionKey, message);
                 Future<Void> future = sendMessage(websocket, message);
                 if (future != null) {
@@ -99,14 +103,22 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
 
         List<Future> futures = new CopyOnWriteArrayList<>();
         for (DefaultWebsocket websocket : websockets) {
-            try {
-                Future<Void> future = sendMessage(websocket, message);
-                if (future != null) {
-                    futures.add(future);
-                }
-            } catch (Exception e) {
-                if (exception == null) {
-                    exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+            boolean isOkToSendMessage = false;
+            if (endpoint.getResourceUri() == null) {
+                isOkToSendMessage = true;
+            } else if (websocket.getPathSpec().equals(WebsocketComponent.createPathSpec(endpoint.getResourceUri()))) {
+                isOkToSendMessage = true;
+            }
+            if (isOkToSendMessage) {
+                try {
+                    Future<Void> future = sendMessage(websocket, message);
+                    if (future != null) {
+                        futures.add(future);
+                    }
+                } catch (Exception e) {
+                    if (exception == null) {
+                        exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
index f7a29f3..b899c71 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
@@ -51,7 +51,7 @@ public class DefaultWebsocketTest {
 
     @Before
     public void setUp() throws Exception {
-        defaultWebsocket = new DefaultWebsocket(sync, consumer);
+        defaultWebsocket = new DefaultWebsocket(sync, null, consumer);
         defaultWebsocket.setConnectionKey(CONNECTION_KEY);
     }
 
@@ -88,7 +88,7 @@ public class DefaultWebsocketTest {
 
     @Test
     public void testOnMessageWithNullConsumer() {
-        defaultWebsocket = new DefaultWebsocket(sync, null);
+        defaultWebsocket = new DefaultWebsocket(sync, null, null);
         defaultWebsocket.setConnectionKey(CONNECTION_KEY);
         defaultWebsocket.onMessage(MESSAGE);
         InOrder inOrder = inOrder(session, consumer, sync);

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
index d403466..4e5eaf3 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
@@ -47,9 +47,9 @@ public class MemoryWebsocketStoreTest {
     @Mock
     private NodeSynchronization sync;
     @Mock
-    private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, consumer);;
+    private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, null, consumer);
     @Mock
-    private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, consumer);;
+    private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, null, consumer);
 
     private MemoryWebsocketStore store;
 
@@ -84,10 +84,6 @@ public class MemoryWebsocketStoreTest {
         // second call of websocket1.getConnectionKey()
         store.remove(websocket1);
         assertNull(store.get(KEY_1));
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket1, times(2)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test
@@ -106,10 +102,6 @@ public class MemoryWebsocketStoreTest {
         } catch (Exception e) {
             assertEquals(NullPointerException.class, e.getClass());
         }
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket1, times(2)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test
@@ -121,10 +113,6 @@ public class MemoryWebsocketStoreTest {
         store.remove(websocket2);
         assertEquals(websocket1, store.get(KEY_1));
         assertNull(store.get(KEY_2));
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket2, times(1)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
index 7469c54..6d885cc 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
@@ -55,10 +55,10 @@ public class NodeSynchronizationImplTest {
 
         store1 = new MemoryWebsocketStore();
 
-        websocket1 = new DefaultWebsocket(sync, consumer);
+        websocket1 = new DefaultWebsocket(sync, null, consumer);
         websocket1.setConnectionKey(KEY_1);
 
-        websocket2 = new DefaultWebsocket(sync, consumer);
+        websocket2 = new DefaultWebsocket(sync, null, consumer);
         websocket2.setConnectionKey(KEY_2);
 
         // when(websocket1.getConnectionKey()).thenReturn(KEY_1);

http://git-wip-us.apache.org/repos/asf/camel/blob/a8a6b74c/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
index f0e06b2..b1eea55 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
@@ -60,7 +60,7 @@ public class WebsocketComponentServletTest {
         socketFactory = new HashMap<String, WebSocketFactory>();
         socketFactory.put("default", new DefaultWebsocketFactory());
         
-        websocketComponentServlet = new WebsocketComponentServlet(sync, socketFactory);
+        websocketComponentServlet = new WebsocketComponentServlet(sync, null, socketFactory);
     }
 
     @Test


[2/2] camel git commit: CAMEL-10949 - websocket clients will get messages on which uri they are subscribed toif sendToAll set to true

Posted by da...@apache.org.
CAMEL-10949 - websocket clients will get messages on which uri they are subscribed toif sendToAll set to true


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/111b12d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/111b12d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/111b12d8

Branch: refs/heads/camel-2.18.x
Commit: 111b12d8709f7f59ae0fc2d4d6c9885b0bbcc6a0
Parents: 711b048
Author: onders86 <on...@gmail.com>
Authored: Mon Apr 3 16:49:31 2017 +0300
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 4 16:38:01 2017 +0200

----------------------------------------------------------------------
 .../component/websocket/DefaultWebsocket.java   |  8 +++++-
 .../websocket/DefaultWebsocketFactory.java      |  4 +--
 .../websocket/MemoryWebsocketStore.java         | 19 +++++++++++--
 .../component/websocket/WebSocketFactory.java   |  2 +-
 .../component/websocket/WebsocketComponent.java |  4 +--
 .../websocket/WebsocketComponentServlet.java    | 10 +++++--
 .../component/websocket/WebsocketProducer.java  | 30 ++++++++++++++------
 .../websocket/DefaultWebsocketTest.java         |  4 +--
 .../websocket/MemoryWebsocketStoreTest.java     | 16 ++---------
 .../websocket/NodeSynchronizationImplTest.java  |  4 +--
 .../WebsocketComponentServletTest.java          |  2 +-
 11 files changed, 64 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
index 422a2a1..be7eeba 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
@@ -38,10 +38,12 @@ public class DefaultWebsocket implements Serializable {
     private final NodeSynchronization sync;
     private Session session;
     private String connectionKey;
+    private String pathSpec;
 
-    public DefaultWebsocket(NodeSynchronization sync, WebsocketConsumer consumer) {
+    public DefaultWebsocket(NodeSynchronization sync, String pathSpec, WebsocketConsumer consumer) {
         this.sync = sync;
         this.consumer = consumer;
+        this.pathSpec = pathSpec;
     }
 
     @OnWebSocketClose
@@ -84,6 +86,10 @@ public class DefaultWebsocket implements Serializable {
     public Session getSession() {
         return session;
     }
+    
+    public String getPathSpec() {
+        return pathSpec;
+    }
 
     public void setSession(Session session) {
         this.session = session;

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
index d37f288..163308f 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
@@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 public class DefaultWebsocketFactory implements WebSocketFactory {
 
     @Override
-    public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer) {
-        return new DefaultWebsocket(sync, consumer);
+    public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer) {
+        return new DefaultWebsocket(sync, pathSpec, consumer);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
index cdea5fd..d0a4d8b 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
@@ -25,12 +25,12 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
 
     @Override
     public void add(DefaultWebsocket ws) {
-        super.put(ws.getConnectionKey(), ws);
+        super.put(getKey(ws), ws);
     }
 
     @Override
     public void remove(DefaultWebsocket ws) {
-        super.remove(ws.getConnectionKey());
+        super.remove(getKey(ws));
     }
 
     @Override
@@ -57,4 +57,19 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
     public void stop() throws Exception {
         clear();
     }
+    
+    private String getKey(DefaultWebsocket ws) {
+        StringBuilder sb = new StringBuilder();
+        if (ws.getConnectionKey() == null && ws.getPathSpec() == null) {
+            return null;
+        } else {
+            if (ws.getConnectionKey() != null) {
+                sb.append(ws.getConnectionKey());
+            }
+            if (ws.getPathSpec() != null) {
+                sb.append(ws.getPathSpec());
+            }
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
index 3e5e641..edf3f47 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
@@ -26,6 +26,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
  */
 public interface WebSocketFactory {
 
-    DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer);
+    DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index 54ff62b..cb2e83c 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -451,7 +451,7 @@ public class WebsocketComponent extends UriEndpointComponent {
     }
 
     protected WebsocketComponentServlet createServlet(NodeSynchronization sync, String pathSpec, Map<String, WebsocketComponentServlet> servlets, ServletContextHandler handler) {
-        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, socketFactory);
+        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, pathSpec, socketFactory);
         servlets.put(pathSpec, servlet);
         ServletHolder servletHolder = new ServletHolder(servlet);
         servletHolder.getInitParameters().putAll(handler.getInitParams());
@@ -545,7 +545,7 @@ public class WebsocketComponent extends UriEndpointComponent {
         return false;
     }
 
-    private static String createPathSpec(String remaining) {
+    public static String createPathSpec(String remaining) {
         // Is not correct as it does not support to add port in the URI
         //return String.format("/%s/*", remaining);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
index fdd4d7b..387fc9f 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
@@ -36,13 +36,15 @@ public class WebsocketComponentServlet extends WebSocketServlet {
 
     private final NodeSynchronization sync;
     private WebsocketConsumer consumer;
+    private String pathSpec;
 
     private ConcurrentMap<String, WebsocketConsumer> consumers = new ConcurrentHashMap<String, WebsocketConsumer>();
     private Map<String, WebSocketFactory> socketFactory;
 
-    public WebsocketComponentServlet(NodeSynchronization sync, Map<String, WebSocketFactory> socketFactory) {
+    public WebsocketComponentServlet(NodeSynchronization sync, String pathSpec, Map<String, WebSocketFactory> socketFactory) {
         this.sync = sync;
         this.socketFactory = socketFactory;
+        this.pathSpec = pathSpec;
     }
 
     public WebsocketConsumer getConsumer() {
@@ -72,7 +74,9 @@ public class WebsocketComponentServlet extends WebSocketServlet {
         }
 
         WebSocketFactory factory = socketFactory.get(protocolKey);
-        return factory.newInstance(request, protocolKey, sync, consumer);
+        return factory.newInstance(request, protocolKey, 
+                (consumer != null && consumer.getEndpoint() != null) ? WebsocketComponent.createPathSpec(consumer.getEndpoint().getResourceUri()) : null,
+                sync, consumer);
     }
 
     public Map<String, WebSocketFactory> getSocketFactory() {
@@ -90,7 +94,7 @@ public class WebsocketComponentServlet extends WebSocketServlet {
             public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
                 String protocolKey = "default";
                 WebSocketFactory factory = socketFactory.get(protocolKey);
-                return factory.newInstance(req, protocolKey, sync, consumer);
+                return factory.newInstance(req, protocolKey, pathSpec, sync, consumer);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 80d0618..3392134 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -54,7 +54,11 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
             // look for connection key and get Websocket
             String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
             if (connectionKey != null) {
-                DefaultWebsocket websocket = store.get(connectionKey);
+                String pathSpec = "";
+                if (endpoint.getResourceUri() != null) {
+                    pathSpec = WebsocketComponent.createPathSpec(endpoint.getResourceUri());
+                }
+                DefaultWebsocket websocket = store.get(connectionKey + pathSpec);
                 log.debug("Sending to connection key {} -> {}", connectionKey, message);
                 Future<Void> future = sendMessage(websocket, message);
                 if (future != null) {
@@ -99,14 +103,22 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
 
         List<Future> futures = new CopyOnWriteArrayList<>();
         for (DefaultWebsocket websocket : websockets) {
-            try {
-                Future<Void> future = sendMessage(websocket, message);
-                if (future != null) {
-                    futures.add(future);
-                }
-            } catch (Exception e) {
-                if (exception == null) {
-                    exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+            boolean isOkToSendMessage = false;
+            if (endpoint.getResourceUri() == null) {
+                isOkToSendMessage = true;
+            } else if (websocket.getPathSpec().equals(WebsocketComponent.createPathSpec(endpoint.getResourceUri()))) {
+                isOkToSendMessage = true;
+            }
+            if (isOkToSendMessage) {
+                try {
+                    Future<Void> future = sendMessage(websocket, message);
+                    if (future != null) {
+                        futures.add(future);
+                    }
+                } catch (Exception e) {
+                    if (exception == null) {
+                        exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
index f7a29f3..b899c71 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
@@ -51,7 +51,7 @@ public class DefaultWebsocketTest {
 
     @Before
     public void setUp() throws Exception {
-        defaultWebsocket = new DefaultWebsocket(sync, consumer);
+        defaultWebsocket = new DefaultWebsocket(sync, null, consumer);
         defaultWebsocket.setConnectionKey(CONNECTION_KEY);
     }
 
@@ -88,7 +88,7 @@ public class DefaultWebsocketTest {
 
     @Test
     public void testOnMessageWithNullConsumer() {
-        defaultWebsocket = new DefaultWebsocket(sync, null);
+        defaultWebsocket = new DefaultWebsocket(sync, null, null);
         defaultWebsocket.setConnectionKey(CONNECTION_KEY);
         defaultWebsocket.onMessage(MESSAGE);
         InOrder inOrder = inOrder(session, consumer, sync);

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
index d403466..4e5eaf3 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
@@ -47,9 +47,9 @@ public class MemoryWebsocketStoreTest {
     @Mock
     private NodeSynchronization sync;
     @Mock
-    private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, consumer);;
+    private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, null, consumer);
     @Mock
-    private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, consumer);;
+    private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, null, consumer);
 
     private MemoryWebsocketStore store;
 
@@ -84,10 +84,6 @@ public class MemoryWebsocketStoreTest {
         // second call of websocket1.getConnectionKey()
         store.remove(websocket1);
         assertNull(store.get(KEY_1));
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket1, times(2)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test
@@ -106,10 +102,6 @@ public class MemoryWebsocketStoreTest {
         } catch (Exception e) {
             assertEquals(NullPointerException.class, e.getClass());
         }
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket1, times(2)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test
@@ -121,10 +113,6 @@ public class MemoryWebsocketStoreTest {
         store.remove(websocket2);
         assertEquals(websocket1, store.get(KEY_1));
         assertNull(store.get(KEY_2));
-
-        InOrder inOrder = inOrder(websocket1, websocket2);
-        inOrder.verify(websocket2, times(1)).getConnectionKey();
-        inOrder.verifyNoMoreInteractions();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
index 7469c54..6d885cc 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
@@ -55,10 +55,10 @@ public class NodeSynchronizationImplTest {
 
         store1 = new MemoryWebsocketStore();
 
-        websocket1 = new DefaultWebsocket(sync, consumer);
+        websocket1 = new DefaultWebsocket(sync, null, consumer);
         websocket1.setConnectionKey(KEY_1);
 
-        websocket2 = new DefaultWebsocket(sync, consumer);
+        websocket2 = new DefaultWebsocket(sync, null, consumer);
         websocket2.setConnectionKey(KEY_2);
 
         // when(websocket1.getConnectionKey()).thenReturn(KEY_1);

http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
index 23250f4..db79908 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
@@ -63,7 +63,7 @@ public class WebsocketComponentServletTest {
         socketFactory = new HashMap<String, WebSocketFactory>();
         socketFactory.put("default", new DefaultWebsocketFactory());
         
-        websocketComponentServlet = new WebsocketComponentServlet(sync, socketFactory);
+        websocketComponentServlet = new WebsocketComponentServlet(sync, null, socketFactory);
     }
 
     @Test