You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/06 12:37:21 UTC

[2/3] camel git commit: CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion.

CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66b436fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66b436fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66b436fb

Branch: refs/heads/camel-2.16.x
Commit: 66b436fb7bc8b63bbd96364cdc3c570bdd59dc5b
Parents: c272e40
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 6 12:36:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 6 12:36:53 2016 +0100

----------------------------------------------------------------------
 .../camel/component/ahc/ws/WsEndpoint.java      | 38 ++++++++++++--------
 .../camel/component/ahc/ws/WsProducer.java      |  2 +-
 2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66b436fb/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 973ca79..86c1fd0 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -16,10 +16,8 @@
  */
 package org.apache.camel.component.ahc.ws;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfig;
@@ -44,7 +42,7 @@ public class WsEndpoint extends AhcEndpoint {
     private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
 
     // for using websocket streaming/fragments
-    private static final boolean GRIZZLY_AVAILABLE = 
+    private static final boolean GRIZZLY_AVAILABLE =
         probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
 
     private final Set<WsConsumer> consumers  = new HashSet<WsConsumer>();
@@ -52,7 +50,7 @@ public class WsEndpoint extends AhcEndpoint {
     private WebSocket websocket;
     @UriParam
     private boolean useStreaming;
-    
+
     public WsEndpoint(String endpointUri, WsComponent component) {
         super(endpointUri, component, null);
     }
@@ -65,7 +63,7 @@ public class WsEndpoint extends AhcEndpoint {
             return false;
         }
     }
-    
+
     @Override
     public WsComponent getComponent() {
         return (WsComponent) super.getComponent();
@@ -117,34 +115,44 @@ public class WsEndpoint extends AhcEndpoint {
         } else {
             client = new AsyncHttpClient(ahp, config);
         }
-        return client; 
+        return client;
     }
 
-    public void connect() throws InterruptedException, ExecutionException, IOException {
-        websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute(
+    public void connect() throws Exception {
+        String uri = getHttpUri().toASCIIString();
+
+        LOG.debug("Connecting to {}", uri);
+        websocket = getClient().prepareGet(uri).execute(
             new WebSocketUpgradeHandler.Builder()
                 .addWebSocketListener(new WsListener()).build()).get();
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         if (websocket != null && websocket.isOpen()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
+            }
             websocket.close();
             websocket = null;
         }
         super.doStop();
     }
 
-    void connect(WsConsumer wsConsumer) {
+    void connect(WsConsumer wsConsumer) throws Exception {
         consumers.add(wsConsumer);
+
+        if (websocket == null || !websocket.isOpen()) {
+            connect();
+        }
     }
 
     void disconnect(WsConsumer wsConsumer) {
         consumers.remove(wsConsumer);
     }
-    
+
     class WsListener implements WebSocketTextListener, WebSocketByteListener {
-                
+
         @Override
         public void onOpen(WebSocket websocket) {
             LOG.debug("websocket opened");
@@ -162,7 +170,7 @@ public class WsEndpoint extends AhcEndpoint {
 
         @Override
         public void onMessage(byte[] message) {
-            LOG.debug("received message --> {}", message);
+            LOG.debug("Received message --> {}", message);
             for (WsConsumer consumer : consumers) {
                 consumer.sendMessage(message);
             }
@@ -170,14 +178,14 @@ public class WsEndpoint extends AhcEndpoint {
 
         @Override
         public void onMessage(String message) {
-            LOG.debug("received message --> {}", message);
+            LOG.debug("Received message --> {}", message);
             for (WsConsumer consumer : consumers) {
                 consumer.sendMessage(message);
             }
         }
 
     }
-    
+
     protected AsyncHttpProvider getAsyncHttpProvider(AsyncHttpClientConfig config) {
         if (GRIZZLY_AVAILABLE) {
             return new GrizzlyAsyncHttpProvider(config);

http://git-wip-us.apache.org/repos/asf/camel/blob/66b436fb/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
index d6319ad..5935fc2 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
@@ -46,8 +46,8 @@ public class WsProducer extends DefaultProducer {
     public void process(Exchange exchange) throws Exception {
         Message in = exchange.getIn();
         Object message = in.getBody();
-        log.debug("Sending out {}", message);
         if (message != null) {
+            log.debug("Sending out {}", message);
             if (message instanceof String) {
                 sendMessage(getWebSocket(), (String)message, getEndpoint().isUseStreaming());
             } else if (message instanceof byte[]) {