You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/17 08:41:25 UTC

camel git commit: CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures.

Repository: camel
Updated Branches:
  refs/heads/master 6407d35c0 -> fedb9c609


CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fedb9c60
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fedb9c60
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fedb9c60

Branch: refs/heads/master
Commit: fedb9c6097e248e26f9b589cc1428fbafe1131b5
Parents: 6407d35
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 17 08:40:08 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 17 08:41:19 2016 +0100

----------------------------------------------------------------------
 .../camel/component/ahc/ws/WsConsumer.java      | 11 ++-
 .../camel/component/ahc/ws/WsEndpoint.java      | 79 +++++++++++++-------
 .../ahc/ws/WsProducerConsumerTest.java          | 25 ++++++-
 3 files changed, 88 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index ab8b5a5..808d76d 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -51,6 +51,10 @@ public class WsConsumer extends DefaultConsumer {
         sendMessageInternal(message);
     }
 
+    public void sendMessage(Throwable throwable) {
+        sendMessageInternal(throwable);
+    }
+
     public void sendMessage(byte[] message) {
         sendMessageInternal(message);
     }
@@ -68,7 +72,12 @@ public class WsConsumer extends DefaultConsumer {
 
         //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 
         // set the body
-        exchange.getIn().setBody(message);
+
+        if (message instanceof Throwable) {
+            exchange.setException((Throwable) message);
+        } else {
+            exchange.getIn().setBody(message);
+        }
 
         // send exchange using the async routing engine
         getAsyncProcessor().process(exchange, new AsyncCallback() {

http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 5187673..b250c2c 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -23,9 +23,8 @@ import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfig;
 import com.ning.http.client.AsyncHttpProvider;
 import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
+import com.ning.http.client.ws.DefaultWebSocketListener;
 import com.ning.http.client.ws.WebSocket;
-import com.ning.http.client.ws.WebSocketByteListener;
-import com.ning.http.client.ws.WebSocketTextListener;
 import com.ning.http.client.ws.WebSocketUpgradeHandler;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -49,24 +48,18 @@ public class WsEndpoint extends AhcEndpoint {
         probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
 
     private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
+    private final WsListener listener = new WsListener();
+    private transient WebSocket websocket;
 
-    private WebSocket websocket;
-    @UriParam
+    @UriParam(label = "producer")
     private boolean useStreaming;
+    @UriParam(label = "consumer")
+    private boolean sendMessageOnError;
 
     public WsEndpoint(String endpointUri, WsComponent component) {
         super(endpointUri, component, null);
     }
 
-    private static boolean probeClass(String name) {
-        try {
-            Class.forName(name, true, WsEndpoint.class.getClassLoader());
-            return true;
-        } catch (Throwable t) {
-            return false;
-        }
-    }
-
     @Override
     public WsComponent getComponent() {
         return (WsComponent) super.getComponent();
@@ -84,9 +77,8 @@ public class WsEndpoint extends AhcEndpoint {
 
     WebSocket getWebSocket() throws Exception {
         synchronized (this) {
-            if (websocket == null) {
-                connect();
-            }
+            // ensure we are connected
+            reConnect();
         }
         return websocket;
     }
@@ -106,6 +98,17 @@ public class WsEndpoint extends AhcEndpoint {
         this.useStreaming = useStreaming;
     }
 
+    public boolean isSendMessageOnError() {
+        return sendMessageOnError;
+    }
+
+    /**
+     * Whether to send an message if the web-socket listener received an error.
+     */
+    public void setSendMessageOnError(boolean sendMessageOnError) {
+        this.sendMessageOnError = sendMessageOnError;
+    }
+
     @Override
     protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
         AsyncHttpClient client;
@@ -127,7 +130,7 @@ public class WsEndpoint extends AhcEndpoint {
         LOG.debug("Connecting to {}", uri);
         websocket = getClient().prepareGet(uri).execute(
             new WebSocketUpgradeHandler.Builder()
-                .addWebSocketListener(new WsListener()).build()).get();
+                .addWebSocketListener(listener).build()).get();
     }
 
     @Override
@@ -136,6 +139,7 @@ public class WsEndpoint extends AhcEndpoint {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
             }
+            websocket.removeWebSocketListener(listener);
             websocket.close();
             websocket = null;
         }
@@ -144,31 +148,46 @@ public class WsEndpoint extends AhcEndpoint {
 
     void connect(WsConsumer wsConsumer) throws Exception {
         consumers.add(wsConsumer);
-
-        if (websocket == null || !websocket.isOpen()) {
-            connect();
-        }
+        reConnect();
     }
 
     void disconnect(WsConsumer wsConsumer) {
         consumers.remove(wsConsumer);
     }
 
-    class WsListener implements WebSocketTextListener, WebSocketByteListener {
+    void reConnect() throws Exception {
+        if (websocket == null || !websocket.isOpen()) {
+            String uri = getHttpUri().toASCIIString();
+            LOG.info("Reconnecting websocket: {}", uri);
+            connect();
+        }
+    }
+
+    class WsListener extends DefaultWebSocketListener {
 
         @Override
         public void onOpen(WebSocket websocket) {
-            LOG.debug("websocket opened");
+            LOG.debug("Websocket opened");
         }
 
         @Override
         public void onClose(WebSocket websocket) {
-            LOG.debug("websocket closed");
+            LOG.debug("websocket closed - reconnecting");
+            try {
+                reConnect();
+            } catch (Exception e) {
+                LOG.warn("Error re-connecting to websocket", e);
+            }
         }
 
         @Override
         public void onError(Throwable t) {
-            LOG.error("websocket on error", t);
+            LOG.debug("websocket on error", t);
+            if (isSendMessageOnError()) {
+                for (WsConsumer consumer : consumers) {
+                    consumer.sendMessage(t);
+                }
+            }
         }
 
         @Override
@@ -195,4 +214,14 @@ public class WsEndpoint extends AhcEndpoint {
         }
         return null;
     }
+
+    private static boolean probeClass(String name) {
+        try {
+            Class.forName(name, true, WsEndpoint.class.getClassLoader());
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
index cf7403d..ee7caf8 100644
--- a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
+++ b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
@@ -88,7 +88,7 @@ public class WsProducerConsumerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testTwoRoutesRestart() throws Exception {
+    public void testTwoRoutesRestartConsumer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived(TEST_MESSAGE);
 
@@ -110,6 +110,29 @@ public class WsProducerConsumerTest extends CamelTestSupport {
         mock.assertIsSatisfied();
     }
 
+    @Test
+    public void testTwoRoutesRestartProducer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+
+        resetMocks();
+
+        log.info("Restarting foo route");
+        context.stopRoute("foo");
+        Thread.sleep(500);
+        context.startRoute("foo");
+
+        mock.expectedBodiesReceived(TEST_MESSAGE);
+
+        template.sendBody("direct:input", TEST_MESSAGE);
+
+        mock.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder[] createRouteBuilders() throws Exception {
         RouteBuilder[] rbs = new RouteBuilder[2];