You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/06 12:37:20 UTC
[1/3] camel git commit: CAMEL-9665: Fixed ahc-ws consumer to connect
to the websocket when starting. Thanks to Thomas Gunter for the
patch/suggestion.
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 0dd343e9a -> 0dfbbb76c
refs/heads/camel-2.16.x c272e40bc -> 66b436fb7
refs/heads/master cd1e214aa -> 02434f00b
CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02434f00
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02434f00
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02434f00
Branch: refs/heads/master
Commit: 02434f00bf6b29872af867f1100ea69743a00620
Parents: cd1e214
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 6 12:36:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 6 12:36:07 2016 +0100
----------------------------------------------------------------------
.../camel/component/ahc/ws/WsEndpoint.java | 38 ++++++++++++--------
.../camel/component/ahc/ws/WsProducer.java | 2 +-
2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/02434f00/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index a7ec795..5187673 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -16,10 +16,8 @@
*/
package org.apache.camel.component.ahc.ws;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
@@ -47,7 +45,7 @@ public class WsEndpoint extends AhcEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
// for using websocket streaming/fragments
- private static final boolean GRIZZLY_AVAILABLE =
+ private static final boolean GRIZZLY_AVAILABLE =
probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
@@ -55,7 +53,7 @@ public class WsEndpoint extends AhcEndpoint {
private WebSocket websocket;
@UriParam
private boolean useStreaming;
-
+
public WsEndpoint(String endpointUri, WsComponent component) {
super(endpointUri, component, null);
}
@@ -68,7 +66,7 @@ public class WsEndpoint extends AhcEndpoint {
return false;
}
}
-
+
@Override
public WsComponent getComponent() {
return (WsComponent) super.getComponent();
@@ -120,34 +118,44 @@ public class WsEndpoint extends AhcEndpoint {
} else {
client = new AsyncHttpClient(ahp, config);
}
- return client;
+ return client;
}
- public void connect() throws InterruptedException, ExecutionException, IOException {
- websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute(
+ public void connect() throws Exception {
+ String uri = getHttpUri().toASCIIString();
+
+ LOG.debug("Connecting to {}", uri);
+ websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
.addWebSocketListener(new WsListener()).build()).get();
}
-
+
@Override
protected void doStop() throws Exception {
if (websocket != null && websocket.isOpen()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
+ }
websocket.close();
websocket = null;
}
super.doStop();
}
- void connect(WsConsumer wsConsumer) {
+ void connect(WsConsumer wsConsumer) throws Exception {
consumers.add(wsConsumer);
+
+ if (websocket == null || !websocket.isOpen()) {
+ connect();
+ }
}
void disconnect(WsConsumer wsConsumer) {
consumers.remove(wsConsumer);
}
-
+
class WsListener implements WebSocketTextListener, WebSocketByteListener {
-
+
@Override
public void onOpen(WebSocket websocket) {
LOG.debug("websocket opened");
@@ -165,7 +173,7 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(byte[] message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
@@ -173,14 +181,14 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(String message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
}
}
-
+
protected AsyncHttpProvider getAsyncHttpProvider(AsyncHttpClientConfig config) {
if (GRIZZLY_AVAILABLE) {
return new GrizzlyAsyncHttpProvider(config);
http://git-wip-us.apache.org/repos/asf/camel/blob/02434f00/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
index d6319ad..5935fc2 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
@@ -46,8 +46,8 @@ public class WsProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
Object message = in.getBody();
- log.debug("Sending out {}", message);
if (message != null) {
+ log.debug("Sending out {}", message);
if (message instanceof String) {
sendMessage(getWebSocket(), (String)message, getEndpoint().isUseStreaming());
} else if (message instanceof byte[]) {
[2/3] camel git commit: CAMEL-9665: Fixed ahc-ws consumer to connect
to the websocket when starting. Thanks to Thomas Gunter for the
patch/suggestion.
Posted by da...@apache.org.
CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66b436fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66b436fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66b436fb
Branch: refs/heads/camel-2.16.x
Commit: 66b436fb7bc8b63bbd96364cdc3c570bdd59dc5b
Parents: c272e40
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 6 12:36:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 6 12:36:53 2016 +0100
----------------------------------------------------------------------
.../camel/component/ahc/ws/WsEndpoint.java | 38 ++++++++++++--------
.../camel/component/ahc/ws/WsProducer.java | 2 +-
2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/66b436fb/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 973ca79..86c1fd0 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -16,10 +16,8 @@
*/
package org.apache.camel.component.ahc.ws;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
@@ -44,7 +42,7 @@ public class WsEndpoint extends AhcEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
// for using websocket streaming/fragments
- private static final boolean GRIZZLY_AVAILABLE =
+ private static final boolean GRIZZLY_AVAILABLE =
probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
@@ -52,7 +50,7 @@ public class WsEndpoint extends AhcEndpoint {
private WebSocket websocket;
@UriParam
private boolean useStreaming;
-
+
public WsEndpoint(String endpointUri, WsComponent component) {
super(endpointUri, component, null);
}
@@ -65,7 +63,7 @@ public class WsEndpoint extends AhcEndpoint {
return false;
}
}
-
+
@Override
public WsComponent getComponent() {
return (WsComponent) super.getComponent();
@@ -117,34 +115,44 @@ public class WsEndpoint extends AhcEndpoint {
} else {
client = new AsyncHttpClient(ahp, config);
}
- return client;
+ return client;
}
- public void connect() throws InterruptedException, ExecutionException, IOException {
- websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute(
+ public void connect() throws Exception {
+ String uri = getHttpUri().toASCIIString();
+
+ LOG.debug("Connecting to {}", uri);
+ websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
.addWebSocketListener(new WsListener()).build()).get();
}
-
+
@Override
protected void doStop() throws Exception {
if (websocket != null && websocket.isOpen()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
+ }
websocket.close();
websocket = null;
}
super.doStop();
}
- void connect(WsConsumer wsConsumer) {
+ void connect(WsConsumer wsConsumer) throws Exception {
consumers.add(wsConsumer);
+
+ if (websocket == null || !websocket.isOpen()) {
+ connect();
+ }
}
void disconnect(WsConsumer wsConsumer) {
consumers.remove(wsConsumer);
}
-
+
class WsListener implements WebSocketTextListener, WebSocketByteListener {
-
+
@Override
public void onOpen(WebSocket websocket) {
LOG.debug("websocket opened");
@@ -162,7 +170,7 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(byte[] message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
@@ -170,14 +178,14 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(String message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
}
}
-
+
protected AsyncHttpProvider getAsyncHttpProvider(AsyncHttpClientConfig config) {
if (GRIZZLY_AVAILABLE) {
return new GrizzlyAsyncHttpProvider(config);
http://git-wip-us.apache.org/repos/asf/camel/blob/66b436fb/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
index d6319ad..5935fc2 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
@@ -46,8 +46,8 @@ public class WsProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
Object message = in.getBody();
- log.debug("Sending out {}", message);
if (message != null) {
+ log.debug("Sending out {}", message);
if (message instanceof String) {
sendMessage(getWebSocket(), (String)message, getEndpoint().isUseStreaming());
} else if (message instanceof byte[]) {
[3/3] camel git commit: CAMEL-9665: Fixed ahc-ws consumer to connect
to the websocket when starting. Thanks to Thomas Gunter for the
patch/suggestion.
Posted by da...@apache.org.
CAMEL-9665: Fixed ahc-ws consumer to connect to the websocket when starting. Thanks to Thomas Gunter for the patch/suggestion.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0dfbbb76
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0dfbbb76
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0dfbbb76
Branch: refs/heads/camel-2.15.x
Commit: 0dfbbb76ca9de5604362e9e80ac15d9aff109503
Parents: 0dd343e
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Mar 6 12:36:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Mar 6 12:37:09 2016 +0100
----------------------------------------------------------------------
.../camel/component/ahc/ws/WsEndpoint.java | 38 ++++++++++++--------
.../camel/component/ahc/ws/WsProducer.java | 2 +-
2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0dfbbb76/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
index 4a60d02..2110ec5 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java
@@ -16,10 +16,8 @@
*/
package org.apache.camel.component.ahc.ws;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
@@ -43,7 +41,7 @@ public class WsEndpoint extends AhcEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
// for using websocket streaming/fragments
- private static final boolean GRIZZLY_AVAILABLE =
+ private static final boolean GRIZZLY_AVAILABLE =
probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
@@ -51,7 +49,7 @@ public class WsEndpoint extends AhcEndpoint {
private WebSocket websocket;
@UriParam
private boolean useStreaming;
-
+
public WsEndpoint(String endpointUri, WsComponent component) {
super(endpointUri, component, null);
}
@@ -64,7 +62,7 @@ public class WsEndpoint extends AhcEndpoint {
return false;
}
}
-
+
@Override
public WsComponent getComponent() {
return (WsComponent) super.getComponent();
@@ -116,34 +114,44 @@ public class WsEndpoint extends AhcEndpoint {
} else {
client = new AsyncHttpClient(ahp, config);
}
- return client;
+ return client;
}
- public void connect() throws InterruptedException, ExecutionException, IOException {
- websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute(
+ public void connect() throws Exception {
+ String uri = getHttpUri().toASCIIString();
+
+ LOG.debug("Connecting to {}", uri);
+ websocket = getClient().prepareGet(uri).execute(
new WebSocketUpgradeHandler.Builder()
.addWebSocketListener(new WsListener()).build()).get();
}
-
+
@Override
protected void doStop() throws Exception {
if (websocket != null && websocket.isOpen()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString());
+ }
websocket.close();
websocket = null;
}
super.doStop();
}
- void connect(WsConsumer wsConsumer) {
+ void connect(WsConsumer wsConsumer) throws Exception {
consumers.add(wsConsumer);
+
+ if (websocket == null || !websocket.isOpen()) {
+ connect();
+ }
}
void disconnect(WsConsumer wsConsumer) {
consumers.remove(wsConsumer);
}
-
+
class WsListener implements WebSocketTextListener, WebSocketByteListener {
-
+
@Override
public void onOpen(WebSocket websocket) {
LOG.debug("websocket opened");
@@ -161,7 +169,7 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(byte[] message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
@@ -169,14 +177,14 @@ public class WsEndpoint extends AhcEndpoint {
@Override
public void onMessage(String message) {
- LOG.debug("received message --> {}", message);
+ LOG.debug("Received message --> {}", message);
for (WsConsumer consumer : consumers) {
consumer.sendMessage(message);
}
}
}
-
+
protected AsyncHttpProvider getAsyncHttpProvider(AsyncHttpClientConfig config) {
if (GRIZZLY_AVAILABLE) {
return new GrizzlyAsyncHttpProvider(config);
http://git-wip-us.apache.org/repos/asf/camel/blob/0dfbbb76/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
index d6319ad..5935fc2 100644
--- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
+++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java
@@ -46,8 +46,8 @@ public class WsProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
Object message = in.getBody();
- log.debug("Sending out {}", message);
if (message != null) {
+ log.debug("Sending out {}", message);
if (message instanceof String) {
sendMessage(getWebSocket(), (String)message, getEndpoint().isUseStreaming());
} else if (message instanceof byte[]) {