You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/05/24 16:55:59 UTC
svn commit: r1342288 - in /camel/trunk/components/camel-websocket/src:
main/java/org/apache/camel/component/websocket/
test/java/org/apache/camel/component/websocket/
Author: cmoulliard
Date: Thu May 24 14:55:58 2012
New Revision: 1342288
URL: http://svn.apache.org/viewvc?rev=1342288&view=rev
Log:
Add org.apache.camel.component.websocket.WebsocketTwoRoutesExampleTest
Added:
camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
Modified:
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java Thu May 24 14:55:58 2012
@@ -195,6 +195,8 @@ public class WebsocketComponent extends
connector = new SelectChannelConnector();
}
+ LOG.debug("Jetty Connector added : " + connector.getName());
+
if (port != null) {
connector.setPort(port);
} else {
@@ -242,6 +244,7 @@ public class WebsocketComponent extends
connectorRef = new ConnectorRef(server, connector, defaultServlet);
CONNECTORS.put(connectorKey, connectorRef);
+ LOG.debug("Jetty Server started for host : " + connector.getHost() + ", on port : " + connector.getPort());
server.start();
} else {
@@ -417,6 +420,7 @@ public class WebsocketComponent extends
WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync);
servlets.put(pathSpec, servlet);
handler.addServlet(new ServletHolder(servlet), pathSpec);
+ LOG.debug("WebSocket servlet added for the following path : " + pathSpec);
return servlet;
}
Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java Thu May 24 14:55:58 2012
@@ -33,8 +33,8 @@ public class WebsocketConsumer extends D
@Override
public void start() throws Exception {
- super.start();
endpoint.connect(this);
+ super.start();
}
@Override
@@ -48,6 +48,15 @@ public class WebsocketConsumer extends D
}
public void sendMessage(final String connectionKey, final String message) {
+
+/* if (!endpoint.isStarted()) {
+ try {
+ endpoint.connect(this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }*/
+
final Exchange exchange = getEndpoint().createExchange();
// set header and body
Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Thu May 24 14:55:58 2012
@@ -42,6 +42,10 @@ public class WebsocketProducer extends D
Message in = exchange.getIn();
String message = in.getMandatoryBody(String.class);
+/* if (!endpoint.isStarted()) {
+ endpoint.connect(this);
+ }*/
+
if (isSendToAllSet(in)) {
sendToAll(store, message, exchange);
} else {
@@ -61,6 +65,19 @@ public class WebsocketProducer extends D
return endpoint;
}
+
+ @Override
+ public void start() throws Exception {
+ endpoint.connect(this);
+ super.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ endpoint.disconnect(this);
+ super.stop();
+ }
+
boolean isSendToAllSet(Message in) {
// header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint)
Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, sendToAll, Boolean.class);
Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java?rev=1342288&view=auto
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java (added)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java Thu May 24 14:55:58 2012
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class WebsocketTwoRoutesExampleTest extends CamelTestSupport {
+
+ private static List<String> received = new ArrayList<String>();
+ private static CountDownLatch latch = new CountDownLatch(1);
+
+ @Test
+ public void testWSHttpCallEcho1() throws Exception {
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://127.0.0.1:9292/echo").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onFragment(String fragment, boolean last) {
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ websocket.sendTextMessage("Beer");
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(1, received.size());
+ assertEquals("BeerBeer", received.get(0));
+
+ websocket.close();
+ c.close();
+ }
+
+ @Test
+ public void testWSHttpCallEcho2() throws Exception {
+ AsyncHttpClient c = new AsyncHttpClient();
+
+ WebSocket websocket = c.prepareGet("ws://127.0.0.1:9393/echo").execute(
+ new WebSocketUpgradeHandler.Builder()
+ .addWebSocketListener(new WebSocketTextListener() {
+ @Override
+ public void onMessage(String message) {
+ received.add(message);
+ log.info("received --> " + message);
+ latch.countDown();
+ }
+
+ @Override
+ public void onFragment(String fragment, boolean last) {
+ }
+
+ @Override
+ public void onOpen(WebSocket websocket) {
+ }
+
+ @Override
+ public void onClose(WebSocket websocket) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ t.printStackTrace();
+ }
+ }).build()).get();
+
+ websocket.sendTextMessage("Beer");
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(1, received.size());
+ assertEquals("BeerBeer", received.get(0));
+
+ websocket.close();
+ c.close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("websocket://localhost:9292/echo")
+ .log(">>> Message received from WebSocket Client : ${body}")
+ .transform().simple("${body}${body}")
+ .to("websocket://localhost:9292/echo");
+
+ from("websocket://localhost:9393/echo")
+ .log(">>> Message received from WebSocket Client : ${body}")
+ .transform().simple("${body}${body}")
+ .to("websocket://localhost:9292/echo");
+ }
+ };
+ }
+}