You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/22 11:37:14 UTC
[3/4] camel git commit: Moved WebSocketStore and NodeSynchronization
from the Endpoint to the Connector
Moved WebSocketStore and NodeSynchronization from the Endpoint to the Connector
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2164aba7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2164aba7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2164aba7
Branch: refs/heads/camel-2.16.x
Commit: 2164aba73b68692417d95f8d850cdcbf3a54d00b
Parents: 4e3ff19
Author: Ton Swieb <to...@finalist.nl>
Authored: Sat Nov 21 21:44:30 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Nov 22 11:36:57 2015 +0100
----------------------------------------------------------------------
.../component/websocket/WebsocketComponent.java | 25 ++++-
.../component/websocket/WebsocketEndpoint.java | 27 +----
.../component/websocket/WebsocketProducer.java | 10 +-
.../websocket/WebsocketProducerTest.java | 3 +-
...dividualAndBroadcastEndpointExampleTest.java | 110 +++++++++++++++++++
...ocketTwoRoutesToSameEndpointExampleTest.java | 109 ++++++++++++++++++
6 files changed, 249 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index 55f7df1..1251a53 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -91,12 +91,14 @@ public class WebsocketComponent extends UriEndpointComponent {
Server server;
Connector connector;
WebsocketComponentServlet servlet;
+ MemoryWebsocketStore memoryStore;
int refCount;
- public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet) {
+ public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet, MemoryWebsocketStore memoryStore) {
this.server = server;
this.connector = connector;
this.servlet = servlet;
+ this.memoryStore = memoryStore;
increment();
}
@@ -176,14 +178,17 @@ public class WebsocketComponent extends UriEndpointComponent {
server = createStaticResourcesServer(server, context, endpoint.getStaticResources());
}
+ MemoryWebsocketStore memoryStore = new MemoryWebsocketStore();
+
// Don't provide a Servlet object as Producer/Consumer will create them later on
- connectorRef = new ConnectorRef(server, connector, null);
+ connectorRef = new ConnectorRef(server, connector, null, memoryStore);
// must enable session before we start
if (endpoint.isSessionSupport()) {
enableSessionSupport(connectorRef.server, connectorKey);
}
LOG.info("Jetty Server starting on host: {}:{}", connector.getHost(), connector.getPort());
+ connectorRef.memoryStore.start();
connectorRef.server.start();
CONNECTORS.put(connectorKey, connectorRef);
@@ -197,7 +202,8 @@ public class WebsocketComponent extends UriEndpointComponent {
enableSessionSupport(connectorRef.server, connectorKey);
}
- WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+ NodeSynchronization sync = new DefaultNodeSynchronization(connectorRef.memoryStore);
+ WebsocketComponentServlet servlet = addServlet(sync, prodcon, endpoint.getResourceUri());
if (prodcon instanceof WebsocketConsumer) {
WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
if (servlet.getConsumer() == null) {
@@ -206,7 +212,10 @@ public class WebsocketComponent extends UriEndpointComponent {
// register the consumer here
servlet.connect(consumer);
}
-
+ if (prodcon instanceof WebsocketProducer) {
+ WebsocketProducer producer = WebsocketProducer.class.cast(prodcon);
+ producer.setStore(connectorRef.memoryStore);
+ }
}
}
@@ -234,6 +243,7 @@ public class WebsocketComponent extends UriEndpointComponent {
connectorRef.connector.stop();
}
connectorRef.server.stop();
+ connectorRef.memoryStore.stop();
CONNECTORS.remove(connectorKey);
// Camel controls the lifecycle of these entities so remove the
// registered MBeans when Camel is done with the managed objects.
@@ -245,6 +255,9 @@ public class WebsocketComponent extends UriEndpointComponent {
if (prodcon instanceof WebsocketConsumer) {
connectorRef.servlet.disconnect((WebsocketConsumer) prodcon);
}
+ if (prodcon instanceof WebsocketProducer) {
+ ((WebsocketProducer) prodcon).setStore(null);
+ }
}
}
}
@@ -791,7 +804,8 @@ public class WebsocketComponent extends UriEndpointComponent {
// must add static resource server to CONNECTORS in case the websocket producers/consumers
// uses the same port number, and therefore we must be part of this
- ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null);
+ MemoryWebsocketStore memoryStore = new MemoryWebsocketStore();
+ ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null,memoryStore);
String key = "websocket:" + host + ":" + port;
CONNECTORS.put(key, ref);
}
@@ -807,6 +821,7 @@ public class WebsocketComponent extends UriEndpointComponent {
connectorRef.server.removeConnector(connectorRef.connector);
connectorRef.connector.stop();
connectorRef.server.stop();
+ connectorRef.memoryStore.stop();
connectorRef.servlet = null;
}
CONNECTORS.remove(connectorKey);
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ba6f0c5..26100f9 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -30,15 +30,12 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.jsse.SSLContextParameters;
import org.eclipse.jetty.server.Handler;
@UriEndpoint(scheme = "websocket", title = "Jetty Websocket", syntax = "websocket:host:port/resourceUri", consumerClass = WebsocketConsumer.class, label = "websocket")
public class WebsocketEndpoint extends DefaultEndpoint {
- private NodeSynchronization sync;
- private WebsocketStore memoryStore;
private WebsocketComponent component;
private URI uri;
private List<Handler> handlers;
@@ -80,8 +77,6 @@ public class WebsocketEndpoint extends DefaultEndpoint {
public WebsocketEndpoint(WebsocketComponent component, String uri, String resourceUri, Map<String, Object> parameters) {
super(uri, component);
this.resourceUri = resourceUri;
- this.memoryStore = new MemoryWebsocketStore();
- this.sync = new DefaultNodeSynchronization(memoryStore);
this.component = component;
try {
this.uri = new URI(uri);
@@ -106,7 +101,7 @@ public class WebsocketEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new WebsocketProducer(this, memoryStore);
+ return new WebsocketProducer(this);
}
public void connect(WebsocketConsumer consumer) throws Exception {
@@ -335,24 +330,4 @@ public class WebsocketEndpoint extends DefaultEndpoint {
public void setResourceUri(String resourceUri) {
this.resourceUri = resourceUri;
}
-
- /**
- * NodeSynchronization
- * @return NodeSynchronization
- */
- public NodeSynchronization getNodeSynchronization() {
- return this.sync;
- }
-
- @Override
- protected void doStart() throws Exception {
- ServiceHelper.startService(memoryStore);
- super.doStart();
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(memoryStore);
- super.doStop();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 18b0775..89b2932 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -26,13 +26,12 @@ import org.apache.camel.impl.DefaultProducer;
public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer {
- private final WebsocketStore store;
+ private WebsocketStore store;
private final Boolean sendToAll;
private final WebsocketEndpoint endpoint;
- public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) {
+ public WebsocketProducer(WebsocketEndpoint endpoint) {
super(endpoint);
- this.store = store;
this.sendToAll = endpoint.getSendToAll();
this.endpoint = endpoint;
}
@@ -111,4 +110,9 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
}
}
}
+
+ //Store is set/unset upon connect/disconnect of the producer
+ public void setStore(WebsocketStore store) {
+ this.store = store;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
index 7a1bcdf..a05f6d8 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
@@ -71,7 +71,8 @@ public class WebsocketProducerTest {
@Before
public void setUp() throws Exception {
- websocketProducer = new WebsocketProducer(endpoint, store);
+ websocketProducer = new WebsocketProducer(endpoint);
+ websocketProducer.setStore(store);
sockets = Arrays.asList(defaultWebsocket1, defaultWebsocket2);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
new file mode 100644
index 0000000..b2a3c9d
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest extends CamelTestSupport {
+
+ private static List<String> received = new ArrayList<String>();
+ private static CountDownLatch latch;
+ private int port;
+
+ @Override
+ public void setUp() throws Exception {
+ port = AvailablePortFinder.getNextAvailable(16310);
+ super.setUp();
+ }
+
+ @Test
+ public void testWSHttpCallEcho() throws Exception {
+
+ // We call the route WebSocket BAR
+ received.clear();
+ latch = new CountDownLatch(2);
+
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ websocket.sendMessage("Beer");
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(2, received.size());
+
+ //Cannot guarantee the order in which messages are received
+ assertTrue(received.contains("The bar has Beer"));
+ assertTrue(received.contains("Broadcasting to Bar"));
+
+ websocket.close();
+ c.close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("websocket://localhost:" + port + "/bar")
+ .log(">>> Message received from BAR WebSocket Client : ${body}")
+ .transform().simple("The bar has ${body}")
+ .to("websocket://localhost:" + port + "/bar");
+
+ from("timer://foo?fixedRate=true&period=12000")
+ //Use a period which is longer then the latch await time
+ .setBody(constant("Broadcasting to Bar"))
+ .log(">>> Broadcasting message to Bar WebSocket Client")
+ .setHeader(WebsocketConstants.SEND_TO_ALL,constant(true))
+ .to("websocket://localhost:" + port + "/bar");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
new file mode 100644
index 0000000..ec089bf
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class WebsocketTwoRoutesToSameEndpointExampleTest extends CamelTestSupport {
+
+ private static List<String> received = new ArrayList<String>();
+ private static CountDownLatch latch;
+ private int port;
+
+ @Override
+ public void setUp() throws Exception {
+ port = AvailablePortFinder.getNextAvailable(16310);
+ super.setUp();
+ }
+
+ @Test
+ public void testWSHttpCallEcho() throws Exception {
+
+ // We call the route WebSocket BAR
+ received.clear();
+ latch = new CountDownLatch(2);
+
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ websocket.sendMessage("Beer");
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(2, received.size());
+
+ //Cannot guarantee the order in which messages are received
+ assertTrue(received.contains("The bar has Beer"));
+ assertTrue(received.contains("Broadcasting to Bar"));
+
+ websocket.close();
+ c.close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("websocket://localhost:" + port + "/bar")
+ .log(">>> Message received from BAR WebSocket Client : ${body}")
+ .transform().simple("The bar has ${body}")
+ .to("websocket://localhost:" + port + "/bar");
+
+ from("timer://foo?fixedRate=true&period=12000")
+ //Use a period which is longer then the latch await time
+ .setBody(constant("Broadcasting to Bar"))
+ .log(">>> Broadcasting message to Bar WebSocket Client")
+ .to("websocket://localhost:" + port + "/bar?sendToAll=true");
+ }
+ };
+ }
+}