You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/04 14:38:12 UTC
[2/2] camel git commit: CAMEL-10949 - websocket clients will get
messages on which uri they are subscribed toif sendToAll set to true
CAMEL-10949 - websocket clients will get messages on which uri they are subscribed toif sendToAll set to true
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/111b12d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/111b12d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/111b12d8
Branch: refs/heads/camel-2.18.x
Commit: 111b12d8709f7f59ae0fc2d4d6c9885b0bbcc6a0
Parents: 711b048
Author: onders86 <on...@gmail.com>
Authored: Mon Apr 3 16:49:31 2017 +0300
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 4 16:38:01 2017 +0200
----------------------------------------------------------------------
.../component/websocket/DefaultWebsocket.java | 8 +++++-
.../websocket/DefaultWebsocketFactory.java | 4 +--
.../websocket/MemoryWebsocketStore.java | 19 +++++++++++--
.../component/websocket/WebSocketFactory.java | 2 +-
.../component/websocket/WebsocketComponent.java | 4 +--
.../websocket/WebsocketComponentServlet.java | 10 +++++--
.../component/websocket/WebsocketProducer.java | 30 ++++++++++++++------
.../websocket/DefaultWebsocketTest.java | 4 +--
.../websocket/MemoryWebsocketStoreTest.java | 16 ++---------
.../websocket/NodeSynchronizationImplTest.java | 4 +--
.../WebsocketComponentServletTest.java | 2 +-
11 files changed, 64 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
index 422a2a1..be7eeba 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java
@@ -38,10 +38,12 @@ public class DefaultWebsocket implements Serializable {
private final NodeSynchronization sync;
private Session session;
private String connectionKey;
+ private String pathSpec;
- public DefaultWebsocket(NodeSynchronization sync, WebsocketConsumer consumer) {
+ public DefaultWebsocket(NodeSynchronization sync, String pathSpec, WebsocketConsumer consumer) {
this.sync = sync;
this.consumer = consumer;
+ this.pathSpec = pathSpec;
}
@OnWebSocketClose
@@ -84,6 +86,10 @@ public class DefaultWebsocket implements Serializable {
public Session getSession() {
return session;
}
+
+ public String getPathSpec() {
+ return pathSpec;
+ }
public void setSession(Session session) {
this.session = session;
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
index d37f288..163308f 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocketFactory.java
@@ -28,7 +28,7 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
public class DefaultWebsocketFactory implements WebSocketFactory {
@Override
- public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer) {
- return new DefaultWebsocket(sync, consumer);
+ public DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer) {
+ return new DefaultWebsocket(sync, pathSpec, consumer);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
index cdea5fd..d0a4d8b 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/MemoryWebsocketStore.java
@@ -25,12 +25,12 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
@Override
public void add(DefaultWebsocket ws) {
- super.put(ws.getConnectionKey(), ws);
+ super.put(getKey(ws), ws);
}
@Override
public void remove(DefaultWebsocket ws) {
- super.remove(ws.getConnectionKey());
+ super.remove(getKey(ws));
}
@Override
@@ -57,4 +57,19 @@ public class MemoryWebsocketStore extends ConcurrentHashMap<String, DefaultWebso
public void stop() throws Exception {
clear();
}
+
+ private String getKey(DefaultWebsocket ws) {
+ StringBuilder sb = new StringBuilder();
+ if (ws.getConnectionKey() == null && ws.getPathSpec() == null) {
+ return null;
+ } else {
+ if (ws.getConnectionKey() != null) {
+ sb.append(ws.getConnectionKey());
+ }
+ if (ws.getPathSpec() != null) {
+ sb.append(ws.getPathSpec());
+ }
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
index 3e5e641..edf3f47 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebSocketFactory.java
@@ -26,6 +26,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
*/
public interface WebSocketFactory {
- DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, NodeSynchronization sync, WebsocketConsumer consumer);
+ DefaultWebsocket newInstance(ServletUpgradeRequest request, String protocol, String pathSpec, NodeSynchronization sync, WebsocketConsumer consumer);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index 54ff62b..cb2e83c 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -451,7 +451,7 @@ public class WebsocketComponent extends UriEndpointComponent {
}
protected WebsocketComponentServlet createServlet(NodeSynchronization sync, String pathSpec, Map<String, WebsocketComponentServlet> servlets, ServletContextHandler handler) {
- WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, socketFactory);
+ WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync, pathSpec, socketFactory);
servlets.put(pathSpec, servlet);
ServletHolder servletHolder = new ServletHolder(servlet);
servletHolder.getInitParameters().putAll(handler.getInitParams());
@@ -545,7 +545,7 @@ public class WebsocketComponent extends UriEndpointComponent {
return false;
}
- private static String createPathSpec(String remaining) {
+ public static String createPathSpec(String remaining) {
// Is not correct as it does not support to add port in the URI
//return String.format("/%s/*", remaining);
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
index fdd4d7b..387fc9f 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
@@ -36,13 +36,15 @@ public class WebsocketComponentServlet extends WebSocketServlet {
private final NodeSynchronization sync;
private WebsocketConsumer consumer;
+ private String pathSpec;
private ConcurrentMap<String, WebsocketConsumer> consumers = new ConcurrentHashMap<String, WebsocketConsumer>();
private Map<String, WebSocketFactory> socketFactory;
- public WebsocketComponentServlet(NodeSynchronization sync, Map<String, WebSocketFactory> socketFactory) {
+ public WebsocketComponentServlet(NodeSynchronization sync, String pathSpec, Map<String, WebSocketFactory> socketFactory) {
this.sync = sync;
this.socketFactory = socketFactory;
+ this.pathSpec = pathSpec;
}
public WebsocketConsumer getConsumer() {
@@ -72,7 +74,9 @@ public class WebsocketComponentServlet extends WebSocketServlet {
}
WebSocketFactory factory = socketFactory.get(protocolKey);
- return factory.newInstance(request, protocolKey, sync, consumer);
+ return factory.newInstance(request, protocolKey,
+ (consumer != null && consumer.getEndpoint() != null) ? WebsocketComponent.createPathSpec(consumer.getEndpoint().getResourceUri()) : null,
+ sync, consumer);
}
public Map<String, WebSocketFactory> getSocketFactory() {
@@ -90,7 +94,7 @@ public class WebsocketComponentServlet extends WebSocketServlet {
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
String protocolKey = "default";
WebSocketFactory factory = socketFactory.get(protocolKey);
- return factory.newInstance(req, protocolKey, sync, consumer);
+ return factory.newInstance(req, protocolKey, pathSpec, sync, consumer);
}
});
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 80d0618..3392134 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -54,7 +54,11 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
// look for connection key and get Websocket
String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
if (connectionKey != null) {
- DefaultWebsocket websocket = store.get(connectionKey);
+ String pathSpec = "";
+ if (endpoint.getResourceUri() != null) {
+ pathSpec = WebsocketComponent.createPathSpec(endpoint.getResourceUri());
+ }
+ DefaultWebsocket websocket = store.get(connectionKey + pathSpec);
log.debug("Sending to connection key {} -> {}", connectionKey, message);
Future<Void> future = sendMessage(websocket, message);
if (future != null) {
@@ -99,14 +103,22 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
List<Future> futures = new CopyOnWriteArrayList<>();
for (DefaultWebsocket websocket : websockets) {
- try {
- Future<Void> future = sendMessage(websocket, message);
- if (future != null) {
- futures.add(future);
- }
- } catch (Exception e) {
- if (exception == null) {
- exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+ boolean isOkToSendMessage = false;
+ if (endpoint.getResourceUri() == null) {
+ isOkToSendMessage = true;
+ } else if (websocket.getPathSpec().equals(WebsocketComponent.createPathSpec(endpoint.getResourceUri()))) {
+ isOkToSendMessage = true;
+ }
+ if (isOkToSendMessage) {
+ try {
+ Future<Void> future = sendMessage(websocket, message);
+ if (future != null) {
+ futures.add(future);
+ }
+ } catch (Exception e) {
+ if (exception == null) {
+ exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
index f7a29f3..b899c71 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java
@@ -51,7 +51,7 @@ public class DefaultWebsocketTest {
@Before
public void setUp() throws Exception {
- defaultWebsocket = new DefaultWebsocket(sync, consumer);
+ defaultWebsocket = new DefaultWebsocket(sync, null, consumer);
defaultWebsocket.setConnectionKey(CONNECTION_KEY);
}
@@ -88,7 +88,7 @@ public class DefaultWebsocketTest {
@Test
public void testOnMessageWithNullConsumer() {
- defaultWebsocket = new DefaultWebsocket(sync, null);
+ defaultWebsocket = new DefaultWebsocket(sync, null, null);
defaultWebsocket.setConnectionKey(CONNECTION_KEY);
defaultWebsocket.onMessage(MESSAGE);
InOrder inOrder = inOrder(session, consumer, sync);
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
index d403466..4e5eaf3 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/MemoryWebsocketStoreTest.java
@@ -47,9 +47,9 @@ public class MemoryWebsocketStoreTest {
@Mock
private NodeSynchronization sync;
@Mock
- private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, consumer);;
+ private DefaultWebsocket websocket1 = new DefaultWebsocket(sync, null, consumer);
@Mock
- private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, consumer);;
+ private DefaultWebsocket websocket2 = new DefaultWebsocket(sync, null, consumer);
private MemoryWebsocketStore store;
@@ -84,10 +84,6 @@ public class MemoryWebsocketStoreTest {
// second call of websocket1.getConnectionKey()
store.remove(websocket1);
assertNull(store.get(KEY_1));
-
- InOrder inOrder = inOrder(websocket1, websocket2);
- inOrder.verify(websocket1, times(2)).getConnectionKey();
- inOrder.verifyNoMoreInteractions();
}
@Test
@@ -106,10 +102,6 @@ public class MemoryWebsocketStoreTest {
} catch (Exception e) {
assertEquals(NullPointerException.class, e.getClass());
}
-
- InOrder inOrder = inOrder(websocket1, websocket2);
- inOrder.verify(websocket1, times(2)).getConnectionKey();
- inOrder.verifyNoMoreInteractions();
}
@Test
@@ -121,10 +113,6 @@ public class MemoryWebsocketStoreTest {
store.remove(websocket2);
assertEquals(websocket1, store.get(KEY_1));
assertNull(store.get(KEY_2));
-
- InOrder inOrder = inOrder(websocket1, websocket2);
- inOrder.verify(websocket2, times(1)).getConnectionKey();
- inOrder.verifyNoMoreInteractions();
}
@Test
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
index 7469c54..6d885cc 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/NodeSynchronizationImplTest.java
@@ -55,10 +55,10 @@ public class NodeSynchronizationImplTest {
store1 = new MemoryWebsocketStore();
- websocket1 = new DefaultWebsocket(sync, consumer);
+ websocket1 = new DefaultWebsocket(sync, null, consumer);
websocket1.setConnectionKey(KEY_1);
- websocket2 = new DefaultWebsocket(sync, consumer);
+ websocket2 = new DefaultWebsocket(sync, null, consumer);
websocket2.setConnectionKey(KEY_2);
// when(websocket1.getConnectionKey()).thenReturn(KEY_1);
http://git-wip-us.apache.org/repos/asf/camel/blob/111b12d8/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
index 23250f4..db79908 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java
@@ -63,7 +63,7 @@ public class WebsocketComponentServletTest {
socketFactory = new HashMap<String, WebSocketFactory>();
socketFactory.put("default", new DefaultWebsocketFactory());
- websocketComponentServlet = new WebsocketComponentServlet(sync, socketFactory);
+ websocketComponentServlet = new WebsocketComponentServlet(sync, null, socketFactory);
}
@Test