You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/28 11:31:25 UTC
[1/2] camel git commit: CAMEL-9257 route stop/start doesn't work for
camel-websocket producer
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x 3f84e4a20 -> 136823a7d
refs/heads/master 6fb6be7f6 -> ee3deceb9
CAMEL-9257 route stop/start doesn't work for camel-websocket producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee3deceb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee3deceb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee3deceb
Branch: refs/heads/master
Commit: ee3deceb902e944c54a1a87d4ea47e50ad21e47d
Parents: 6fb6be7
Author: Tomohisa Igarashi <tm...@gmail.com>
Authored: Wed Oct 28 10:15:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 28 11:32:37 2015 +0100
----------------------------------------------------------------------
.../component/websocket/WebsocketComponent.java | 52 +++-----
.../component/websocket/WebsocketEndpoint.java | 12 +-
.../WebsocketProducerRouteRestartTest.java | 130 +++++++++++++++++++
3 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index cf8af8a..55f7df1 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -142,8 +142,6 @@ public class WebsocketComponent extends UriEndpointComponent {
connector = new SelectChannelConnector();
}
- LOG.trace("Jetty Connector added: {}", connector.getName());
-
if (endpoint.getPort() != null) {
connector.setPort(endpoint.getPort());
} else {
@@ -162,6 +160,7 @@ public class WebsocketComponent extends UriEndpointComponent {
enableJmx(server);
}
server.addConnector(connector);
+ LOG.trace("Jetty Connector added: {}", connector.getName());
// Create ServletContextHandler
ServletContextHandler context = createContext(server, connector, endpoint.getHandlers());
@@ -198,6 +197,16 @@ public class WebsocketComponent extends UriEndpointComponent {
enableSessionSupport(connectorRef.server, connectorKey);
}
+ WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+ if (prodcon instanceof WebsocketConsumer) {
+ WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
+ if (servlet.getConsumer() == null) {
+ servlet.setConsumer(consumer);
+ }
+ // register the consumer here
+ servlet.connect(consumer);
+ }
+
}
}
@@ -215,6 +224,10 @@ public class WebsocketComponent extends UriEndpointComponent {
ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
if (connectorRef != null) {
if (connectorRef.decrement() == 0) {
+ LOG.info("Stopping Jetty Server as the last connector is disconnecting: {}:{}"
+ , connectorRef.connector.getHost()
+ , connectorRef.connector.getPort());
+ servlets.remove(createPathSpec(endpoint.getResourceUri()));
connectorRef.server.removeConnector(connectorRef.connector);
if (connectorRef.connector != null) {
// static server may not have set a connector
@@ -408,35 +421,10 @@ public class WebsocketComponent extends UriEndpointComponent {
return createStaticResourcesServer(server, context, home);
}
- protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducer producer, String remaining) throws Exception {
+ protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducerConsumer prodcon, String resourceUri) throws Exception {
// Get Connector from one of the Jetty Instances to add WebSocket Servlet
- WebsocketEndpoint endpoint = producer.getEndpoint();
- String key = getConnectorKey(endpoint);
- ConnectorRef connectorRef = getConnectors().get(key);
-
- WebsocketComponentServlet servlet;
-
- if (connectorRef != null) {
- String pathSpec = createPathSpec(remaining);
- servlet = servlets.get(pathSpec);
- if (servlet == null) {
- // Retrieve Context
- ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
- servlet = createServlet(sync, pathSpec, servlets, context);
- connectorRef.servlet = servlet;
- LOG.debug("WebSocket Producer Servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
- }
- return servlet;
- } else {
- throw new Exception("Jetty instance has not been retrieved for : " + key);
- }
- }
-
- protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String resourceUri) throws Exception {
-
- // Get Connector from one of the Jetty Instances to add WebSocket Servlet
- WebsocketEndpoint endpoint = consumer.getEndpoint();
+ WebsocketEndpoint endpoint = prodcon.getEndpoint();
String key = getConnectorKey(endpoint);
ConnectorRef connectorRef = getConnectors().get(key);
@@ -450,15 +438,9 @@ public class WebsocketComponent extends UriEndpointComponent {
ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
servlet = createServlet(sync, pathSpec, servlets, context);
connectorRef.servlet = servlet;
- servlets.put(pathSpec, servlet);
LOG.debug("WebSocket servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
}
- if (servlet.getConsumer() == null) {
- servlet.setConsumer(consumer);
- }
- // register the consumer here
- servlet.connect(consumer);
return servlet;
} else {
throw new Exception("Jetty instance has not been retrieved for : " + key);
http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ffafdaf..ba6f0c5 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -111,22 +111,18 @@ public class WebsocketEndpoint extends DefaultEndpoint {
public void connect(WebsocketConsumer consumer) throws Exception {
component.connect(consumer);
- component.addServlet(sync, consumer, resourceUri);
}
public void disconnect(WebsocketConsumer consumer) throws Exception {
component.disconnect(consumer);
- // Servlet should be removed
}
public void connect(WebsocketProducer producer) throws Exception {
component.connect(producer);
- component.addServlet(sync, producer, resourceUri);
}
public void disconnect(WebsocketProducer producer) throws Exception {
component.disconnect(producer);
- // Servlet should be removed
}
@Override
@@ -340,6 +336,14 @@ public class WebsocketEndpoint extends DefaultEndpoint {
this.resourceUri = resourceUri;
}
+ /**
+ * NodeSynchronization
+ * @return NodeSynchronization
+ */
+ public NodeSynchronization getNodeSynchronization() {
+ return this.sync;
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(memoryStore);
http://git-wip-us.apache.org/repos/asf/camel/blob/ee3deceb/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
new file mode 100644
index 0000000..b1b2ce2
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WebsocketProducerRouteRestartTest extends CamelTestSupport {
+
+ private static final String ROUTE_ID = WebsocketProducerRouteRestartTest.class.getSimpleName();
+ private static List<Object> received = new ArrayList<Object>();
+ private static CountDownLatch latch;
+ protected int port;
+
+ @Produce(uri = "direct:shop")
+ private ProducerTemplate producer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ port = AvailablePortFinder.getNextAvailable(16200);
+ super.setUp();
+ received.clear();
+ latch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testWSSuspendResumeRoute() throws Exception {
+ context.suspendRoute(ROUTE_ID);
+ context.resumeRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ @Test
+ public void testWSStopStartRoute() throws Exception {
+ context.stopRoute(ROUTE_ID);
+ context.startRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ @Test
+ public void testWSRemoveAddRoute() throws Exception {
+ context.removeRoute(ROUTE_ID);
+ context.addRoutes(createRouteBuilder());
+ context.startRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ private void doTestWSHttpCall() throws Exception {
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ // Send message to the direct endpoint
+ producer.sendBodyAndHeader("Beer on stock at Apache Mall", WebsocketConstants.SEND_TO_ALL, "true");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(1, received.size());
+ Object r = received.get(0);
+ assertTrue(r instanceof String);
+ assertEquals("Beer on stock at Apache Mall", r);
+
+ websocket.close();
+ c.close();
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:shop")
+ .id(ROUTE_ID)
+ .log(">>> Message received from Shopping center : ${body}")
+ .to("websocket://localhost:" + port + "/shop");
+ }
+ };
+ }
+}
[2/2] camel git commit: CAMEL-9257 route stop/start doesn't work for
camel-websocket producer
Posted by da...@apache.org.
CAMEL-9257 route stop/start doesn't work for camel-websocket producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/136823a7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/136823a7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/136823a7
Branch: refs/heads/camel-2.16.x
Commit: 136823a7dc7dac505cb428d6aea528056d98d3da
Parents: 3f84e4a
Author: Tomohisa Igarashi <tm...@gmail.com>
Authored: Wed Oct 28 10:15:10 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 28 11:34:05 2015 +0100
----------------------------------------------------------------------
.../component/websocket/WebsocketComponent.java | 52 +++-----
.../component/websocket/WebsocketEndpoint.java | 12 +-
.../WebsocketProducerRouteRestartTest.java | 130 +++++++++++++++++++
3 files changed, 155 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
index cf8af8a..55f7df1 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
@@ -142,8 +142,6 @@ public class WebsocketComponent extends UriEndpointComponent {
connector = new SelectChannelConnector();
}
- LOG.trace("Jetty Connector added: {}", connector.getName());
-
if (endpoint.getPort() != null) {
connector.setPort(endpoint.getPort());
} else {
@@ -162,6 +160,7 @@ public class WebsocketComponent extends UriEndpointComponent {
enableJmx(server);
}
server.addConnector(connector);
+ LOG.trace("Jetty Connector added: {}", connector.getName());
// Create ServletContextHandler
ServletContextHandler context = createContext(server, connector, endpoint.getHandlers());
@@ -198,6 +197,16 @@ public class WebsocketComponent extends UriEndpointComponent {
enableSessionSupport(connectorRef.server, connectorKey);
}
+ WebsocketComponentServlet servlet = addServlet(endpoint.getNodeSynchronization(), prodcon, endpoint.getResourceUri());
+ if (prodcon instanceof WebsocketConsumer) {
+ WebsocketConsumer consumer = WebsocketConsumer.class.cast(prodcon);
+ if (servlet.getConsumer() == null) {
+ servlet.setConsumer(consumer);
+ }
+ // register the consumer here
+ servlet.connect(consumer);
+ }
+
}
}
@@ -215,6 +224,10 @@ public class WebsocketComponent extends UriEndpointComponent {
ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
if (connectorRef != null) {
if (connectorRef.decrement() == 0) {
+ LOG.info("Stopping Jetty Server as the last connector is disconnecting: {}:{}"
+ , connectorRef.connector.getHost()
+ , connectorRef.connector.getPort());
+ servlets.remove(createPathSpec(endpoint.getResourceUri()));
connectorRef.server.removeConnector(connectorRef.connector);
if (connectorRef.connector != null) {
// static server may not have set a connector
@@ -408,35 +421,10 @@ public class WebsocketComponent extends UriEndpointComponent {
return createStaticResourcesServer(server, context, home);
}
- protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducer producer, String remaining) throws Exception {
+ protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketProducerConsumer prodcon, String resourceUri) throws Exception {
// Get Connector from one of the Jetty Instances to add WebSocket Servlet
- WebsocketEndpoint endpoint = producer.getEndpoint();
- String key = getConnectorKey(endpoint);
- ConnectorRef connectorRef = getConnectors().get(key);
-
- WebsocketComponentServlet servlet;
-
- if (connectorRef != null) {
- String pathSpec = createPathSpec(remaining);
- servlet = servlets.get(pathSpec);
- if (servlet == null) {
- // Retrieve Context
- ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
- servlet = createServlet(sync, pathSpec, servlets, context);
- connectorRef.servlet = servlet;
- LOG.debug("WebSocket Producer Servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
- }
- return servlet;
- } else {
- throw new Exception("Jetty instance has not been retrieved for : " + key);
- }
- }
-
- protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer consumer, String resourceUri) throws Exception {
-
- // Get Connector from one of the Jetty Instances to add WebSocket Servlet
- WebsocketEndpoint endpoint = consumer.getEndpoint();
+ WebsocketEndpoint endpoint = prodcon.getEndpoint();
String key = getConnectorKey(endpoint);
ConnectorRef connectorRef = getConnectors().get(key);
@@ -450,15 +438,9 @@ public class WebsocketComponent extends UriEndpointComponent {
ServletContextHandler context = (ServletContextHandler) connectorRef.server.getHandler();
servlet = createServlet(sync, pathSpec, servlets, context);
connectorRef.servlet = servlet;
- servlets.put(pathSpec, servlet);
LOG.debug("WebSocket servlet added for the following path : " + pathSpec + ", to the Jetty Server : " + key);
}
- if (servlet.getConsumer() == null) {
- servlet.setConsumer(consumer);
- }
- // register the consumer here
- servlet.connect(consumer);
return servlet;
} else {
throw new Exception("Jetty instance has not been retrieved for : " + key);
http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index ffafdaf..ba6f0c5 100644
--- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -111,22 +111,18 @@ public class WebsocketEndpoint extends DefaultEndpoint {
public void connect(WebsocketConsumer consumer) throws Exception {
component.connect(consumer);
- component.addServlet(sync, consumer, resourceUri);
}
public void disconnect(WebsocketConsumer consumer) throws Exception {
component.disconnect(consumer);
- // Servlet should be removed
}
public void connect(WebsocketProducer producer) throws Exception {
component.connect(producer);
- component.addServlet(sync, producer, resourceUri);
}
public void disconnect(WebsocketProducer producer) throws Exception {
component.disconnect(producer);
- // Servlet should be removed
}
@Override
@@ -340,6 +336,14 @@ public class WebsocketEndpoint extends DefaultEndpoint {
this.resourceUri = resourceUri;
}
+ /**
+ * NodeSynchronization
+ * @return NodeSynchronization
+ */
+ public NodeSynchronization getNodeSynchronization() {
+ return this.sync;
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(memoryStore);
http://git-wip-us.apache.org/repos/asf/camel/blob/136823a7/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
new file mode 100644
index 0000000..b1b2ce2
--- /dev/null
+++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteRestartTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.ws.WebSocket;
+import com.ning.http.client.ws.WebSocketTextListener;
+import com.ning.http.client.ws.WebSocketUpgradeHandler;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WebsocketProducerRouteRestartTest extends CamelTestSupport {
+
+ private static final String ROUTE_ID = WebsocketProducerRouteRestartTest.class.getSimpleName();
+ private static List<Object> received = new ArrayList<Object>();
+ private static CountDownLatch latch;
+ protected int port;
+
+ @Produce(uri = "direct:shop")
+ private ProducerTemplate producer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ port = AvailablePortFinder.getNextAvailable(16200);
+ super.setUp();
+ received.clear();
+ latch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testWSSuspendResumeRoute() throws Exception {
+ context.suspendRoute(ROUTE_ID);
+ context.resumeRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ @Test
+ public void testWSStopStartRoute() throws Exception {
+ context.stopRoute(ROUTE_ID);
+ context.startRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ @Test
+ public void testWSRemoveAddRoute() throws Exception {
+ context.removeRoute(ROUTE_ID);
+ context.addRoutes(createRouteBuilder());
+ context.startRoute(ROUTE_ID);
+ doTestWSHttpCall();
+ }
+
+ private void doTestWSHttpCall() throws Exception {
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ // Send message to the direct endpoint
+ producer.sendBodyAndHeader("Beer on stock at Apache Mall", WebsocketConstants.SEND_TO_ALL, "true");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(1, received.size());
+ Object r = received.get(0);
+ assertTrue(r instanceof String);
+ assertEquals("Beer on stock at Apache Mall", r);
+
+ websocket.close();
+ c.close();
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:shop")
+ .id(ROUTE_ID)
+ .log(">>> Message received from Shopping center : ${body}")
+ .to("websocket://localhost:" + port + "/shop");
+ }
+ };
+ }
+}