You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/22 11:37:14 UTC

[3/4] camel git commit: Moved WebSocketStore and NodeSynchronization from the Endpoint to the Connector

Moved WebSocketStore and NodeSynchronization from the Endpoint to the Connector


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2164aba7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2164aba7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2164aba7

Branch: refs/heads/camel-2.16.x
Commit: 2164aba73b68692417d95f8d850cdcbf3a54d00b
Parents: 4e3ff19
Author: Ton Swieb <to...@finalist.nl>
Authored: Sat Nov 21 21:44:30 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Nov 22 11:36:57 2015 +0100

----------------------------------------------------------------------
 .../component/websocket/WebsocketComponent.java |  25 ++++-
 .../component/websocket/WebsocketEndpoint.java  |  27 +----
 .../component/websocket/WebsocketProducer.java  |  10 +-
 .../websocket/WebsocketProducerTest.java        |   3 +-
 ...dividualAndBroadcastEndpointExampleTest.java | 110 +++++++++++++++++++
 ...ocketTwoRoutesToSameEndpointExampleTest.java | 109 ++++++++++++++++++
 6 files changed, 249 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index 55f7df1..1251a53 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -91,12 +91,14 @@ public class WebsocketComponent extends UriEndpointComponent {
         Server server;
         Connector connector;
         WebsocketComponentServlet servlet;
+        MemoryWebsocketStore memoryStore;
         int refCount;
 
-        public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet) {
+        public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet servlet, MemoryWebsocketStore memoryStore) {
             this.server = server;
             this.connector = connector;
             this.servlet = servlet;
+            this.memoryStore = memoryStore;
             increment();
         }
 
@@ -176,14 +178,17 @@ public class WebsocketComponent extends UriEndpointComponent {
                     server = createStaticResourcesServer(server, context, endpoint.getStaticResources());
                 }
 
+                MemoryWebsocketStore memoryStore = new MemoryWebsocketStore();
+                
                 // Don't provide a Servlet object as Producer/Consumer will create them later on
-                connectorRef = new ConnectorRef(server, connector, null);
+                connectorRef = new ConnectorRef(server, connector, null, memoryStore);
 
                 // must enable session before we start
                 if (endpoint.isSessionSupport()) {
                     enableSessionSupport(connectorRef.server, connectorKey);
                 }
                 LOG.info("Jetty Server starting on host: {}:{}", connector.getHost(), connector.getPort());
+                connectorRef.memoryStore.start();
                 connectorRef.server.start();
 
                 CONNECTORS.put(connectorKey, connectorRef);
@@ -197,7 +202,8 @@ public class WebsocketComponent extends UriEndpointComponent {
                 enableSessionSupport(connectorRef.server, connectorKey);
             }
 
-            WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+            NodeSynchronization sync = new DefaultNodeSynchronization(connectorRef.memoryStore);            
+            WebsocketComponentServlet servlet = addServlet(sync, prodcon, endpoint.getResourceUri());
             if (prodcon instanceof WebsocketConsumer) {
                 WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
                 if (servlet.getConsumer() == null) {
@@ -206,7 +212,10 @@ public class WebsocketComponent extends UriEndpointComponent {
                 // register the consumer here
                 servlet.connect(consumer);
             }
-
+            if (prodcon instanceof WebsocketProducer) {
+            	WebsocketProducer producer = WebsocketProducer.class.cast(prodcon);
+            	producer.setStore(connectorRef.memoryStore);
+            }
         }
 
     }
@@ -234,6 +243,7 @@ public class WebsocketComponent extends UriEndpointComponent {
                         connectorRef.connector.stop();
                     }
                     connectorRef.server.stop();
+                    connectorRef.memoryStore.stop();
                     CONNECTORS.remove(connectorKey);
                     // Camel controls the lifecycle of these entities so remove the
                     // registered MBeans when Camel is done with the managed objects.
@@ -245,6 +255,9 @@ public class WebsocketComponent extends UriEndpointComponent {
                 if (prodcon instanceof WebsocketConsumer) {
                     connectorRef.servlet.disconnect((WebsocketConsumer) prodcon);
                 }
+                if (prodcon instanceof WebsocketProducer) {
+                	((WebsocketProducer) prodcon).setStore(null);
+                }
             }
         }
     }
@@ -791,7 +804,8 @@ public class WebsocketComponent extends UriEndpointComponent {
 
             // must add static resource server to CONNECTORS in case the websocket producers/consumers
             // uses the same port number, and therefore we must be part of this
-            ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null);
+            MemoryWebsocketStore memoryStore = new MemoryWebsocketStore();
+            ConnectorRef ref = new ConnectorRef(staticResourcesServer, connector, null,memoryStore);
             String key = "websocket:" + host + ":" + port;
             CONNECTORS.put(key, ref);
         }
@@ -807,6 +821,7 @@ public class WebsocketComponent extends UriEndpointComponent {
                     connectorRef.server.removeConnector(connectorRef.connector);
                     connectorRef.connector.stop();
                     connectorRef.server.stop();
+                    connectorRef.memoryStore.stop();
                     connectorRef.servlet = null;
                 }
                 CONNECTORS.remove(connectorKey);

http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ba6f0c5..26100f9 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -30,15 +30,12 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.jsse.SSLContextParameters;
 import org.eclipse.jetty.server.Handler;
 
 @UriEndpoint(scheme = "websocket", title = "Jetty Websocket", syntax = "websocket:host:port/resourceUri", consumerClass = WebsocketConsumer.class, label = "websocket")
 public class WebsocketEndpoint extends DefaultEndpoint {
 
-    private NodeSynchronization sync;
-    private WebsocketStore memoryStore;
     private WebsocketComponent component;
     private URI uri;
     private List<Handler> handlers;
@@ -80,8 +77,6 @@ public class WebsocketEndpoint extends DefaultEndpoint {
     public WebsocketEndpoint(WebsocketComponent component, String uri, String resourceUri, Map<String, Object> parameters) {
         super(uri, component);
         this.resourceUri = resourceUri;
-        this.memoryStore = new MemoryWebsocketStore();
-        this.sync = new DefaultNodeSynchronization(memoryStore);
         this.component = component;
         try {
             this.uri = new URI(uri);
@@ -106,7 +101,7 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new WebsocketProducer(this, memoryStore);
+        return new WebsocketProducer(this);
     }
 
     public void connect(WebsocketConsumer consumer) throws Exception {
@@ -335,24 +330,4 @@ public class WebsocketEndpoint extends DefaultEndpoint {
     public void setResourceUri(String resourceUri) {
         this.resourceUri = resourceUri;
     }
-
-    /**
-     * NodeSynchronization
-     * @return NodeSynchronization
-     */
-    public NodeSynchronization getNodeSynchronization() {
-        return this.sync;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        ServiceHelper.startService(memoryStore);
-        super.doStart();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(memoryStore);
-        super.doStop();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 18b0775..89b2932 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -26,13 +26,12 @@ import org.apache.camel.impl.DefaultProducer;
 
 public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer {
 
-    private final WebsocketStore store;
+    private WebsocketStore store;
     private final Boolean sendToAll;
     private final WebsocketEndpoint endpoint;
 
-    public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) {
+    public WebsocketProducer(WebsocketEndpoint endpoint) {
         super(endpoint);
-        this.store = store;
         this.sendToAll = endpoint.getSendToAll();
         this.endpoint = endpoint;
     }
@@ -111,4 +110,9 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu
             }
         }
     }
+
+    //Store is set/unset upon connect/disconnect of the producer
+	public void setStore(WebsocketStore store) {
+		this.store = store;
+	}
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
index 7a1bcdf..a05f6d8 100644
--- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
@@ -71,7 +71,8 @@ public class WebsocketProducerTest {
 
     @Before
     public void setUp() throws Exception {
-        websocketProducer = new WebsocketProducer(endpoint, store);
+        websocketProducer = new WebsocketProducer(endpoint);
+        websocketProducer.setStore(store);
         sockets = Arrays.asList(defaultWebsocket1, defaultWebsocket2);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
new file mode 100644
index 0000000..b2a3c9d
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class WebsocketTwoRoutesToSIndividualAndBroadcastEndpointExampleTest extends CamelTestSupport {
+
+    private static List<String> received = new ArrayList<String>();
+    private static CountDownLatch latch;
+    private int port;
+
+    @Override
+    public void setUp() throws Exception {
+        port = AvailablePortFinder.getNextAvailable(16310);
+        super.setUp();
+    }
+
+    @Test
+    public void testWSHttpCallEcho() throws Exception {
+
+        // We call the route WebSocket BAR
+        received.clear();
+        latch = new CountDownLatch(2);
+
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        websocket.sendMessage("Beer");
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(2, received.size());
+
+        //Cannot guarantee the order in which messages are received
+        assertTrue(received.contains("The bar has Beer"));
+        assertTrue(received.contains("Broadcasting to Bar"));
+
+        websocket.close();
+        c.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("websocket://localhost:" + port + "/bar")
+                    .log(">>> Message received from BAR WebSocket Client : ${body}")
+                    .transform().simple("The bar has ${body}")
+                    .to("websocket://localhost:" + port + "/bar");
+
+                from("timer://foo?fixedRate=true&period=12000")
+	        		//Use a period which is longer then the latch await time
+	        		.setBody(constant("Broadcasting to Bar"))
+	                .log(">>> Broadcasting message to Bar WebSocket Client")
+	                .setHeader(WebsocketConstants.SEND_TO_ALL,constant(true))
+	                .to("websocket://localhost:" + port + "/bar");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2164aba7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
new file mode 100644
index 0000000..ec089bf
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesToSameEndpointExampleTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class WebsocketTwoRoutesToSameEndpointExampleTest extends CamelTestSupport {
+
+    private static List<String> received = new ArrayList<String>();
+    private static CountDownLatch latch;
+    private int port;
+
+    @Override
+    public void setUp() throws Exception {
+        port = AvailablePortFinder.getNextAvailable(16310);
+        super.setUp();
+    }
+
+    @Test
+    public void testWSHttpCallEcho() throws Exception {
+
+        // We call the route WebSocket BAR
+        received.clear();
+        latch = new CountDownLatch(2);
+
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/bar").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        websocket.sendMessage("Beer");
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(2, received.size());
+
+        //Cannot guarantee the order in which messages are received
+        assertTrue(received.contains("The bar has Beer"));
+        assertTrue(received.contains("Broadcasting to Bar"));
+
+        websocket.close();
+        c.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("websocket://localhost:" + port + "/bar")
+                    .log(">>> Message received from BAR WebSocket Client : ${body}")
+                    .transform().simple("The bar has ${body}")
+                    .to("websocket://localhost:" + port + "/bar");
+
+                from("timer://foo?fixedRate=true&period=12000")
+	        		//Use a period which is longer then the latch await time
+	        		.setBody(constant("Broadcasting to Bar"))
+	                .log(">>> Broadcasting message to Bar WebSocket Client")
+	                .to("websocket://localhost:" + port + "/bar?sendToAll=true");
+            }
+        };
+    }
+}