You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/28 11:31:25 UTC

[1/2] camel git commit: CAMEL-9257 route stop/start doesn't work for camel-websocket producer

Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 3f84e4a20 -> 136823a7d
  refs/heads/master 6fb6be7f6 -> ee3deceb9


CAMEL-9257 route stop/start doesn't work for camel-websocket producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee3deceb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee3deceb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee3deceb

Branch: refs/heads/master
Commit: ee3deceb902e944c54a1a87d4ea47e50ad21e47d
Parents: 6fb6be7
Author: Tomohisa Igarashi <tm...@gmail.com>
Authored: Wed Oct 28 10:15:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 28 11:32:37 2015 +0100

----------------------------------------------------------------------
 .../component/websocket/WebsocketComponent.java |  52 +++-----
 .../component/websocket/WebsocketEndpoint.java  |  12 +-
 .../WebsocketProducerRouteRestartTest.java      | 130 +++++++++++++++++++
 3 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index cf8af8a..55f7df1 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -142,8 +142,6 @@ public class WebsocketComponent extends UriEndpointComponent {
                     connector = new SelectChannelConnector();
                 }
 
-                LOG.trace("Jetty Connector added: {}", connector.getName());
-
                 if (endpoint.getPort() != null) {
                     connector.setPort(endpoint.getPort());
                 } else {
@@ -162,6 +160,7 @@ public class WebsocketComponent extends UriEndpointComponent {
                     enableJmx(server);
                 }
                 server.addConnector(connector);
+                LOG.trace("Jetty Connector added: {}", connector.getName());
 
                 // Create ServletContextHandler
                 ServletContextHandler context = createContext(server, connector, endpoint.getHandlers());
@@ -198,6 +197,16 @@ public class WebsocketComponent extends UriEndpointComponent {
                 enableSessionSupport(connectorRef.server, connectorKey);
             }
 
+            WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+            if (prodcon instanceof WebsocketConsumer) {
+                WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
+                if (servlet.getConsumer() == null) {
+                    servlet.setConsumer(consumer);
+                }
+                // register the consumer here
+                servlet.connect(consumer);
+            }
+
         }
 
     }
@@ -215,6 +224,10 @@ public class WebsocketComponent extends UriEndpointComponent {
             ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
             if (connectorRef != null) {
                 if (connectorRef.decrement() == 0) {
+                    LOG.info("Stopping Jetty Server as the last connector is disconnecting: {}:{}"
+                            , connectorRef.connector.getHost()
+                            , connectorRef.connector.getPort());
+                    servlets.remove(createPathSpec(endpoint.getResourceUri()));
                     connectorRef.server.removeConnector(connectorRef.connector);
                     if (connectorRef.connector != null) {
                         // static server may not have set a connector
@@ -408,35 +421,10 @@ public class WebsocketComponent extends UriEndpointComponent {
         return createStaticResourcesServer(server, context, home);
     }
 
-    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducer producer, String remaining) throws Exception {
+    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducerConsumer prodcon, String resourceUri) throws Exception {
 
         // Get Connector from one of the Jetty Instances to add WebSocket Servlet
-        WebsocketEndpoint endpoint = producer.getEndpoint();
-        String key = getConnectorKey(endpoint);
-        ConnectorRef connectorRef = getConnectors().get(key);
-
-        WebsocketComponentServlet servlet;
-
-        if (connectorRef != null) {
-            String pathSpec = createPathSpec(remaining);
-            servlet = servlets.get(pathSpec);
-            if (servlet == null) {
-                // Retrieve Context
-                ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
-                servlet = createServlet(sync, pathSpec, servlets, context);
-                connectorRef.servlet = servlet;
-                LOG.debug("WebSocket Producer Servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
-            }
-            return servlet;
-        } else {
-            throw new Exception("Jetty instance has not been retrieved for : " + key);
-        }
-    }
-
-    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String resourceUri) throws Exception {
-
-        // Get Connector from one of the Jetty Instances to add WebSocket Servlet
-        WebsocketEndpoint endpoint = consumer.getEndpoint();
+        WebsocketEndpoint endpoint = prodcon.getEndpoint();
         String key = getConnectorKey(endpoint);
         ConnectorRef connectorRef = getConnectors().get(key);
 
@@ -450,15 +438,9 @@ public class WebsocketComponent extends UriEndpointComponent {
                 ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
                 servlet = createServlet(sync, pathSpec, servlets, context);
                 connectorRef.servlet = servlet;
-                servlets.put(pathSpec, servlet);
                 LOG.debug("WebSocket servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
             }
 
-            if (servlet.getConsumer() == null) {
-                servlet.setConsumer(consumer);
-            }
-            // register the consumer here
-            servlet.connect(consumer);
             return servlet;
         } else {
             throw new Exception("Jetty instance has not been retrieved for : " + key);

http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ffafdaf..ba6f0c5 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -111,22 +111,18 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     public void connect(WebsocketConsumer consumer) throws Exception {
         component.connect(consumer);
-        component.addServlet(sync, consumer, resourceUri);
     }
 
     public void disconnect(WebsocketConsumer consumer) throws Exception {
         component.disconnect(consumer);
-        // Servlet should be removed
     }
 
     public void connect(WebsocketProducer producer) throws Exception {
         component.connect(producer);
-        component.addServlet(sync, producer, resourceUri);
     }
 
     public void disconnect(WebsocketProducer producer) throws Exception {
         component.disconnect(producer);
-        // Servlet should be removed
     }
 
     @Override
@@ -340,6 +336,14 @@ public class WebsocketEndpoint extends DefaultEndpoint {
         this.resourceUri = resourceUri;
     }
 
+    /**
+     * NodeSynchronization
+     * @return NodeSynchronization
+     */
+    public NodeSynchronization getNodeSynchronization() {
+        return this.sync;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(memoryStore);

http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
new file mode 100644
index 0000000..b1b2ce2
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WebsocketProducerRouteRestartTest extends CamelTestSupport {
+
+    private static final String ROUTE_ID = WebsocketProducerRouteRestartTest.class.getSimpleName();
+    private static List<Object> received = new ArrayList<Object>();
+    private static CountDownLatch latch;
+    protected int port;
+
+    @Produce(uri = "direct:shop")
+    private ProducerTemplate producer;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        port = AvailablePortFinder.getNextAvailable(16200);
+        super.setUp();
+        received.clear();
+        latch =  new CountDownLatch(1);
+    }
+
+    @Test
+    public void testWSSuspendResumeRoute() throws Exception {
+        context.suspendRoute(ROUTE_ID);
+        context.resumeRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    @Test
+    public void testWSStopStartRoute() throws Exception {
+        context.stopRoute(ROUTE_ID);
+        context.startRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    @Test
+    public void testWSRemoveAddRoute() throws Exception {
+        context.removeRoute(ROUTE_ID);
+        context.addRoutes(createRouteBuilder());
+        context.startRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    private void doTestWSHttpCall() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+                    
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        // Send message to the direct endpoint
+        producer.sendBodyAndHeader("Beer on stock at Apache Mall", WebsocketConstants.SEND_TO_ALL, "true");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        Object r = received.get(0);
+        assertTrue(r instanceof String);
+        assertEquals("Beer on stock at Apache Mall", r);
+
+        websocket.close();
+        c.close();
+        
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:shop")
+                    .id(ROUTE_ID)
+                    .log(">>> Message received from Shopping center : ${body}")
+                    .to("websocket://localhost:" + port + "/shop");
+            }
+        };
+    }
+}


[2/2] camel git commit: CAMEL-9257 route stop/start doesn't work for camel-websocket producer

Posted by da...@apache.org.
CAMEL-9257 route stop/start doesn't work for camel-websocket producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/136823a7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/136823a7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/136823a7

Branch: refs/heads/camel-2.16.x
Commit: 136823a7dc7dac505cb428d6aea528056d98d3da
Parents: 3f84e4a
Author: Tomohisa Igarashi <tm...@gmail.com>
Authored: Wed Oct 28 10:15:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 28 11:34:05 2015 +0100

----------------------------------------------------------------------
 .../component/websocket/WebsocketComponent.java |  52 +++-----
 .../component/websocket/WebsocketEndpoint.java  |  12 +-
 .../WebsocketProducerRouteRestartTest.java      | 130 +++++++++++++++++++
 3 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index cf8af8a..55f7df1 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -142,8 +142,6 @@ public class WebsocketComponent extends UriEndpointComponent {
                     connector = new SelectChannelConnector();
                 }
 
-                LOG.trace("Jetty Connector added: {}", connector.getName());
-
                 if (endpoint.getPort() != null) {
                     connector.setPort(endpoint.getPort());
                 } else {
@@ -162,6 +160,7 @@ public class WebsocketComponent extends UriEndpointComponent {
                     enableJmx(server);
                 }
                 server.addConnector(connector);
+                LOG.trace("Jetty Connector added: {}", connector.getName());
 
                 // Create ServletContextHandler
                 ServletContextHandler context = createContext(server, connector, endpoint.getHandlers());
@@ -198,6 +197,16 @@ public class WebsocketComponent extends UriEndpointComponent {
                 enableSessionSupport(connectorRef.server, connectorKey);
             }
 
+            WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+            if (prodcon instanceof WebsocketConsumer) {
+                WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
+                if (servlet.getConsumer() == null) {
+                    servlet.setConsumer(consumer);
+                }
+                // register the consumer here
+                servlet.connect(consumer);
+            }
+
         }
 
     }
@@ -215,6 +224,10 @@ public class WebsocketComponent extends UriEndpointComponent {
             ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
             if (connectorRef != null) {
                 if (connectorRef.decrement() == 0) {
+                    LOG.info("Stopping Jetty Server as the last connector is disconnecting: {}:{}"
+                            , connectorRef.connector.getHost()
+                            , connectorRef.connector.getPort());
+                    servlets.remove(createPathSpec(endpoint.getResourceUri()));
                     connectorRef.server.removeConnector(connectorRef.connector);
                     if (connectorRef.connector != null) {
                         // static server may not have set a connector
@@ -408,35 +421,10 @@ public class WebsocketComponent extends UriEndpointComponent {
         return createStaticResourcesServer(server, context, home);
     }
 
-    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducer producer, String remaining) throws Exception {
+    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducerConsumer prodcon, String resourceUri) throws Exception {
 
         // Get Connector from one of the Jetty Instances to add WebSocket Servlet
-        WebsocketEndpoint endpoint = producer.getEndpoint();
-        String key = getConnectorKey(endpoint);
-        ConnectorRef connectorRef = getConnectors().get(key);
-
-        WebsocketComponentServlet servlet;
-
-        if (connectorRef != null) {
-            String pathSpec = createPathSpec(remaining);
-            servlet = servlets.get(pathSpec);
-            if (servlet == null) {
-                // Retrieve Context
-                ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
-                servlet = createServlet(sync, pathSpec, servlets, context);
-                connectorRef.servlet = servlet;
-                LOG.debug("WebSocket Producer Servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
-            }
-            return servlet;
-        } else {
-            throw new Exception("Jetty instance has not been retrieved for : " + key);
-        }
-    }
-
-    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String resourceUri) throws Exception {
-
-        // Get Connector from one of the Jetty Instances to add WebSocket Servlet
-        WebsocketEndpoint endpoint = consumer.getEndpoint();
+        WebsocketEndpoint endpoint = prodcon.getEndpoint();
         String key = getConnectorKey(endpoint);
         ConnectorRef connectorRef = getConnectors().get(key);
 
@@ -450,15 +438,9 @@ public class WebsocketComponent extends UriEndpointComponent {
                 ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
                 servlet = createServlet(sync, pathSpec, servlets, context);
                 connectorRef.servlet = servlet;
-                servlets.put(pathSpec, servlet);
                 LOG.debug("WebSocket servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
             }
 
-            if (servlet.getConsumer() == null) {
-                servlet.setConsumer(consumer);
-            }
-            // register the consumer here
-            servlet.connect(consumer);
             return servlet;
         } else {
             throw new Exception("Jetty instance has not been retrieved for : " + key);

http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ffafdaf..ba6f0c5 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -111,22 +111,18 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     public void connect(WebsocketConsumer consumer) throws Exception {
         component.connect(consumer);
-        component.addServlet(sync, consumer, resourceUri);
     }
 
     public void disconnect(WebsocketConsumer consumer) throws Exception {
         component.disconnect(consumer);
-        // Servlet should be removed
     }
 
     public void connect(WebsocketProducer producer) throws Exception {
         component.connect(producer);
-        component.addServlet(sync, producer, resourceUri);
     }
 
     public void disconnect(WebsocketProducer producer) throws Exception {
         component.disconnect(producer);
-        // Servlet should be removed
     }
 
     @Override
@@ -340,6 +336,14 @@ public class WebsocketEndpoint extends DefaultEndpoint {
         this.resourceUri = resourceUri;
     }
 
+    /**
+     * NodeSynchronization
+     * @return NodeSynchronization
+     */
+    public NodeSynchronization getNodeSynchronization() {
+        return this.sync;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(memoryStore);

http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
new file mode 100644
index 0000000..b1b2ce2
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WebsocketProducerRouteRestartTest extends CamelTestSupport {
+
+    private static final String ROUTE_ID = WebsocketProducerRouteRestartTest.class.getSimpleName();
+    private static List<Object> received = new ArrayList<Object>();
+    private static CountDownLatch latch;
+    protected int port;
+
+    @Produce(uri = "direct:shop")
+    private ProducerTemplate producer;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        port = AvailablePortFinder.getNextAvailable(16200);
+        super.setUp();
+        received.clear();
+        latch =  new CountDownLatch(1);
+    }
+
+    @Test
+    public void testWSSuspendResumeRoute() throws Exception {
+        context.suspendRoute(ROUTE_ID);
+        context.resumeRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    @Test
+    public void testWSStopStartRoute() throws Exception {
+        context.stopRoute(ROUTE_ID);
+        context.startRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    @Test
+    public void testWSRemoveAddRoute() throws Exception {
+        context.removeRoute(ROUTE_ID);
+        context.addRoutes(createRouteBuilder());
+        context.startRoute(ROUTE_ID);
+        doTestWSHttpCall();
+    }
+
+    private void doTestWSHttpCall() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+                    
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        // Send message to the direct endpoint
+        producer.sendBodyAndHeader("Beer on stock at Apache Mall", WebsocketConstants.SEND_TO_ALL, "true");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        Object r = received.get(0);
+        assertTrue(r instanceof String);
+        assertEquals("Beer on stock at Apache Mall", r);
+
+        websocket.close();
+        c.close();
+        
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:shop")
+                    .id(ROUTE_ID)
+                    .log(">>> Message received from Shopping center : ${body}")
+                    .to("websocket://localhost:" + port + "/shop");
+            }
+        };
+    }
+}