You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/17 08:41:25 UTC
camel git commit: CAMEL-9566: Improved camel-ahc-ws to better
re-connect in case of ws failures.
Repository: camel
Updated Branches:
refs/heads/master 6407d35c0 -> fedb9c609
CAMEL-9566: Improved camel-ahc-ws to better re-connect in case of ws failures.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fedb9c60
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fedb9c60
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fedb9c60
Branch: refs/heads/master
Commit: fedb9c6097e248e26f9b589cc1428fbafe1131b5
Parents: 6407d35
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 17 08:40:08 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 17 08:41:19 2016 +0100
----------------------------------------------------------------------
.../camel/component/ahc/ws/WsConsumer.java | 11 ++-
.../camel/component/ahc/ws/WsEndpoint.java | 79 +++++++++++++-------
.../ahc/ws/WsProducerConsumerTest.java | 25 ++++++-
3 files changed, 88 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index ab8b5a5..808d76d 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -51,6 +51,10 @@ public class WsConsumer extends DefaultConsumer {
sendMessageInternal(message);
}
+ public void sendMessage(Throwable throwable) {
+ sendMessageInternal(throwable);
+ }
+
public void sendMessage(byte[] message) {
sendMessageInternal(message);
}
@@ -68,7 +72,12 @@ public class WsConsumer extends DefaultConsumer {
//TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0
// set the body
- exchange.getIn().setBody(message);
+
+ if (message instanceof Throwable) {
+ exchange.setException((Throwable) message);
+ } else {
+ exchange.getIn().setBody(message);
+ }
// send exchange using the async routing engine
getAsyncProcessor().process(exchange, new AsyncCallback() {
http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 5187673..b250c2c 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -23,9 +23,8 @@ import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.AsyncHttpProvider;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
+import com.ning.http.client.ws.DefaultWebSocketListener;
import com.ning.http.client.ws.WebSocket;
-import com.ning.http.client.ws.WebSocketByteListener;
-import com.ning.http.client.ws.WebSocketTextListener;
import com.ning.http.client.ws.WebSocketUpgradeHandler;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
@@ -49,24 +48,18 @@ public class WsEndpoint extends AhcEndpoint {
probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
+ private final WsListener listener = new WsListener();
+ private transient WebSocket websocket;
- private WebSocket websocket;
- @UriParam
+ @UriParam(label = "producer")
private boolean useStreaming;
+ @UriParam(label = "consumer")
+ private boolean sendMessageOnError;
public WsEndpoint(String endpointUri, WsComponent component) {
super(endpointUri, component, null);
}
- private static boolean probeClass(String name) {
- try {
- Class.forName(name, true, WsEndpoint.class.getClassLoader());
- return true;
- } catch (Throwable t) {
- return false;
- }
- }
-
@Override
public WsComponent getComponent() {
return (WsComponent) super.getComponent();
@@ -84,9 +77,8 @@ public class WsEndpoint extends AhcEndpoint {
WebSocket getWebSocket() throws Exception {
synchronized (this) {
- if (websocket == null) {
- connect();
- }
+ // ensure we are connected
+ reConnect();
}
return websocket;
}
@@ -106,6 +98,17 @@ public class WsEndpoint extends AhcEndpoint {
this.useStreaming = useStreaming;
}
+ public boolean isSendMessageOnError() {
+ return sendMessageOnError;
+ }
+
+ /**
+ * Whether to send an message if the web-socket listener received an error.
+ */
+ public void setSendMessageOnError(boolean sendMessageOnError) {
+ this.sendMessageOnError = sendMessageOnError;
+ }
+
@Override
protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
AsyncHttpClient client;
@@ -127,7 +130,7 @@ public class WsEndpoint extends AhcEndpoint {
LOG.debug("Connecting to {}", uri);
websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
- .addWebSocketListener(new WsListener()).build()).get();
+ .addWebSocketListener(listener).build()).get();
}
@Override
@@ -136,6 +139,7 @@ public class WsEndpoint extends AhcEndpoint {
if (LOG.isDebugEnabled()) {
LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
}
+ websocket.removeWebSocketListener(listener);
websocket.close();
websocket = null;
}
@@ -144,31 +148,46 @@ public class WsEndpoint extends AhcEndpoint {
void connect(WsConsumer wsConsumer) throws Exception {
consumers.add(wsConsumer);
-
- if (websocket == null || !websocket.isOpen()) {
- connect();
- }
+ reConnect();
}
void disconnect(WsConsumer wsConsumer) {
consumers.remove(wsConsumer);
}
- class WsListener implements WebSocketTextListener, WebSocketByteListener {
+ void reConnect() throws Exception {
+ if (websocket == null || !websocket.isOpen()) {
+ String uri = getHttpUri().toASCIIString();
+ LOG.info("Reconnecting websocket: {}", uri);
+ connect();
+ }
+ }
+
+ class WsListener extends DefaultWebSocketListener {
@Override
public void onOpen(WebSocket websocket) {
- LOG.debug("websocket opened");
+ LOG.debug("Websocket opened");
}
@Override
public void onClose(WebSocket websocket) {
- LOG.debug("websocket closed");
+ LOG.debug("websocket closed - reconnecting");
+ try {
+ reConnect();
+ } catch (Exception e) {
+ LOG.warn("Error re-connecting to websocket", e);
+ }
}
@Override
public void onError(Throwable t) {
- LOG.error("websocket on error", t);
+ LOG.debug("websocket on error", t);
+ if (isSendMessageOnError()) {
+ for (WsConsumer consumer : consumers) {
+ consumer.sendMessage(t);
+ }
+ }
}
@Override
@@ -195,4 +214,14 @@ public class WsEndpoint extends AhcEndpoint {
}
return null;
}
+
+ private static boolean probeClass(String name) {
+ try {
+ Class.forName(name, true, WsEndpoint.class.getClassLoader());
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fedb9c60/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
index cf7403d..ee7caf8 100644
--- a/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
+++ b/components/camel-ahc-ws/src/test/java/org/apache/camel/component/ahc/ws/WsProducerConsumerTest.java
@@ -88,7 +88,7 @@ public class WsProducerConsumerTest extends CamelTestSupport {
}
@Test
- public void testTwoRoutesRestart() throws Exception {
+ public void testTwoRoutesRestartConsumer() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived(TEST_MESSAGE);
@@ -110,6 +110,29 @@ public class WsProducerConsumerTest extends CamelTestSupport {
mock.assertIsSatisfied();
}
+ @Test
+ public void testTwoRoutesRestartProducer() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived(TEST_MESSAGE);
+
+ template.sendBody("direct:input", TEST_MESSAGE);
+
+ mock.assertIsSatisfied();
+
+ resetMocks();
+
+ log.info("Restarting foo route");
+ context.stopRoute("foo");
+ Thread.sleep(500);
+ context.startRoute("foo");
+
+ mock.expectedBodiesReceived(TEST_MESSAGE);
+
+ template.sendBody("direct:input", TEST_MESSAGE);
+
+ mock.assertIsSatisfied();
+ }
+
@Override
protected RouteBuilder[] createRouteBuilders() throws Exception {
RouteBuilder[] rbs = new RouteBuilder[2];