You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/05/24 16:55:59 UTC

svn commit: r1342288 - in /camel/trunk/components/camel-websocket/src: main/java/org/apache/camel/component/websocket/ test/java/org/apache/camel/component/websocket/

Author: cmoulliard
Date: Thu May 24 14:55:58 2012
New Revision: 1342288

URL: http://svn.apache.org/viewvc?rev=1342288&view=rev
Log:
Add org.apache.camel.component.websocket.WebsocketTwoRoutesExampleTest

Added:
    camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
Modified:
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java Thu May 24 14:55:58 2012
@@ -195,6 +195,8 @@ public class WebsocketComponent extends 
                     connector = new SelectChannelConnector();
                 }
 
+                LOG.debug("Jetty Connector added : " + connector.getName());
+
                 if (port != null) {
                     connector.setPort(port);
                 } else {
@@ -242,6 +244,7 @@ public class WebsocketComponent extends 
                 connectorRef = new ConnectorRef(server, connector, defaultServlet);
                 CONNECTORS.put(connectorKey, connectorRef);
 
+                LOG.debug("Jetty Server started for host : " + connector.getHost() + ", on port : " + connector.getPort());
                 server.start();
 
             } else {
@@ -417,6 +420,7 @@ public class WebsocketComponent extends 
         WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync);
         servlets.put(pathSpec, servlet);
         handler.addServlet(new ServletHolder(servlet), pathSpec);
+        LOG.debug("WebSocket servlet added for the following path : " + pathSpec);
         return servlet;
     }
 

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java Thu May 24 14:55:58 2012
@@ -33,8 +33,8 @@ public class WebsocketConsumer extends D
 
     @Override
     public void start() throws Exception {
-        super.start();
         endpoint.connect(this);
+        super.start();
     }
 
     @Override
@@ -48,6 +48,15 @@ public class WebsocketConsumer extends D
     }
 
     public void sendMessage(final String connectionKey, final String message) {
+
+/*        if (!endpoint.isStarted()) {
+            try {
+                endpoint.connect(this);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }*/
+
         final Exchange exchange = getEndpoint().createExchange();
 
         // set header and body

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1342288&r1=1342287&r2=1342288&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java (original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Thu May 24 14:55:58 2012
@@ -42,6 +42,10 @@ public class WebsocketProducer extends D
         Message in = exchange.getIn();
         String message = in.getMandatoryBody(String.class);
 
+/*        if (!endpoint.isStarted()) {
+            endpoint.connect(this);
+        }*/
+
         if (isSendToAllSet(in)) {
             sendToAll(store, message, exchange);
         } else {
@@ -61,6 +65,19 @@ public class WebsocketProducer extends D
         return endpoint;
     }
 
+
+    @Override
+    public void start() throws Exception {
+        endpoint.connect(this);
+        super.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        endpoint.disconnect(this);
+        super.stop();
+    }
+
     boolean isSendToAllSet(Message in) {
         // header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint)
         Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, sendToAll, Boolean.class);

Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java?rev=1342288&view=auto
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java (added)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java Thu May 24 14:55:58 2012
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class WebsocketTwoRoutesExampleTest extends CamelTestSupport {
+
+    private static List<String> received = new ArrayList<String>();
+    private static CountDownLatch latch = new CountDownLatch(1);
+
+    @Test
+    public void testWSHttpCallEcho1() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://127.0.0.1:9292/echo").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onFragment(String fragment, boolean last) {
+                    }
+
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        websocket.sendTextMessage("Beer");
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        assertEquals("BeerBeer", received.get(0));
+
+        websocket.close();
+        c.close();
+    }
+
+    @Test
+    public void testWSHttpCallEcho2() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://127.0.0.1:9393/echo").execute(
+                new WebSocketUpgradeHandler.Builder()
+                        .addWebSocketListener(new WebSocketTextListener() {
+                            @Override
+                            public void onMessage(String message) {
+                                received.add(message);
+                                log.info("received --> " + message);
+                                latch.countDown();
+                            }
+
+                            @Override
+                            public void onFragment(String fragment, boolean last) {
+                            }
+
+                            @Override
+                            public void onOpen(WebSocket websocket) {
+                            }
+
+                            @Override
+                            public void onClose(WebSocket websocket) {
+                            }
+
+                            @Override
+                            public void onError(Throwable t) {
+                                t.printStackTrace();
+                            }
+                        }).build()).get();
+
+        websocket.sendTextMessage("Beer");
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        assertEquals("BeerBeer", received.get(0));
+
+        websocket.close();
+        c.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("websocket://localhost:9292/echo")
+                    .log(">>> Message received from WebSocket Client : ${body}")
+                    .transform().simple("${body}${body}")
+                    .to("websocket://localhost:9292/echo");
+
+                from("websocket://localhost:9393/echo")
+                        .log(">>> Message received from WebSocket Client : ${body}")
+                        .transform().simple("${body}${body}")
+                        .to("websocket://localhost:9292/echo");
+            }
+        };
+    }
+}