You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2020/01/15 09:40:19 UTC
[camel] branch camel-3.0.x updated: CAMEL-14396: websocket-jsr356
consumer fails when endpoint URI parameters are provided
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.0.x by this push:
new 3d6d4b0 CAMEL-14396: websocket-jsr356 consumer fails when endpoint URI parameters are provided
3d6d4b0 is described below
commit 3d6d4b0804fade0efbb92d84cf1cec5f2352a9c7
Author: James Netherton <ja...@gmail.com>
AuthorDate: Tue Jan 14 15:08:56 2020 +0000
CAMEL-14396: websocket-jsr356 consumer fails when endpoint URI parameters are provided
---
.../src/main/docs/websocket-jsr356-component.adoc | 2 +-
.../websocket/jsr356/CamelServerEndpoint.java | 2 +-
.../camel/websocket/jsr356/JSR356Consumer.java | 19 +++++------
.../camel/websocket/jsr356/JSR356Endpoint.java | 22 ++++++++----
.../camel/websocket/jsr356/JSR356Producer.java | 31 ++++++++++++-----
.../websocket/jsr356/JSR356WebSocketComponent.java | 39 ++++++----------------
.../camel/websocket/jsr356/JSR356ConsumerTest.java | 15 ++++-----
.../camel/websocket/jsr356/JSR356ProducerTest.java | 29 +++++-----------
.../dsl/JSR356WebSocketEndpointBuilderFactory.java | 7 ++--
.../ROOT/pages/websocket-jsr356-component.adoc | 2 +-
10 files changed, 76 insertions(+), 92 deletions(-)
diff --git a/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc b/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc
index 7b04a42..22afd27 100644
--- a/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc
+++ b/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc
@@ -63,7 +63,7 @@ with the following path and query parameters:
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *websocketPathOrUri* | If a path (/foo) it will deploy locally the endpoint, if an uri it will connect to the corresponding server | | String
+| *uri* | If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server | | URI
|===
diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java
index 9d265b0..66bf8aa 100644
--- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java
+++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java
@@ -63,7 +63,7 @@ public class CamelServerEndpoint extends Endpoint {
synchronized (session) {
if (session.isOpen()) {
try {
- session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "an exception occured"));
+ session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "an exception occurred"));
} catch (final IOException e) {
log.debug("Error closing session #{}", session.getId(), e);
}
diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
index 5e1fbb3..856d71a 100644
--- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
+++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
@@ -30,7 +30,6 @@ import org.apache.camel.support.DefaultConsumer;
import static java.util.Optional.ofNullable;
public class JSR356Consumer extends DefaultConsumer {
- private final int sessionCount;
private ClientSessions manager;
private Runnable closeTask;
@@ -45,9 +44,8 @@ public class JSR356Consumer extends DefaultConsumer {
});
};
- JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor processor, final int sessionCount) {
+ JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor processor) {
super(jsr356Endpoint, processor);
- this.sessionCount = sessionCount;
}
@Override
@@ -58,20 +56,19 @@ public class JSR356Consumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- final String endpointKey = getEndpoint().getEndpointUri().substring("websocket-jsr356://".length());
- if (endpointKey.contains("://")) { // we act as a client
- final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create(); // todo:
- // config
- manager = new ClientSessions(sessionCount, URI.create(endpointKey), clientConfig.build(), onMessage);
+ final URI uri = getEndpoint().getUri();
+ if (uri.getScheme() != null && uri.getScheme().equals("ws")) { // we act as a client
+ final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create();
+ manager = new ClientSessions(getEndpoint().getSessionCount(), uri, clientConfig.build(), onMessage);
manager.prepare();
} else {
final JSR356WebSocketComponent.ContextBag bag = JSR356WebSocketComponent.getContext(null);
- final CamelServerEndpoint endpoint = bag.getEndpoints().get(endpointKey);
+ final CamelServerEndpoint endpoint = bag.getEndpoints().get(uri.getPath());
if (endpoint == null) {
// todo: make it customizable (the endpoint config)
- final ServerEndpointConfig.Builder configBuilder = ServerEndpointConfig.Builder.create(CamelServerEndpoint.class, endpointKey);
+ final ServerEndpointConfig.Builder configBuilder = ServerEndpointConfig.Builder.create(CamelServerEndpoint.class, uri.getPath());
final CamelServerEndpoint serverEndpoint = new CamelServerEndpoint();
- bag.getEndpoints().put(endpointKey, serverEndpoint);
+ bag.getEndpoints().put(uri.getPath(), serverEndpoint);
closeTask = addObserver(serverEndpoint);
configBuilder.configurator(new ServerEndpointConfig.Configurator() {
@Override
diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java
index ec90e82..810fe97 100644
--- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java
+++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.websocket.jsr356;
+import java.net.URI;
+
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -26,17 +28,15 @@ import org.apache.camel.support.DefaultEndpoint;
@UriEndpoint(firstVersion = "2.23.0", scheme = "websocket-jsr356", title = "Javax Websocket", syntax = "websocket-jsr356:websocketPathOrUri", label = "jsr356")
public class JSR356Endpoint extends DefaultEndpoint {
- @UriPath(description = "If a path (/foo) it will deploy locally the endpoint, " + "if an uri it will connect to the corresponding server")
- private String websocketPathOrUri;
+ @UriPath(description = "If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. "
+ + "Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server")
+ private URI uri;
@UriParam(description = "Used when the endpoint is in client mode to populate a pool of sessions")
private int sessionCount = 1;
- private final JSR356WebSocketComponent component;
-
public JSR356Endpoint(final JSR356WebSocketComponent component, final String uri) {
super(uri, component);
- this.component = component;
}
@Override
@@ -46,12 +46,20 @@ public class JSR356Endpoint extends DefaultEndpoint {
@Override
public Consumer createConsumer(final Processor processor) {
- return new JSR356Consumer(this, processor, sessionCount);
+ return new JSR356Consumer(this, processor);
}
@Override
public Producer createProducer() {
- return new JSR356Producer(this, sessionCount);
+ return new JSR356Producer(this);
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
}
public int getSessionCount() {
diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java
index f74361c..d6eeaae 100644
--- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java
+++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java
@@ -17,26 +17,28 @@
package org.apache.camel.websocket.jsr356;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
import javax.websocket.ClientEndpointConfig;
+import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.util.IOHelper;
import static java.util.Optional.ofNullable;
public class JSR356Producer extends DefaultAsyncProducer {
- private final int sessionCount;
private ClientSessions manager;
private BiConsumer<Exchange, AsyncCallback> onExchange;
- JSR356Producer(final JSR356Endpoint jsr356Endpoint, final int sessionCount) {
+ JSR356Producer(final JSR356Endpoint jsr356Endpoint) {
super(jsr356Endpoint);
- this.sessionCount = sessionCount;
}
@Override
@@ -60,20 +62,31 @@ public class JSR356Producer extends DefaultAsyncProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- final String endpointKey = getEndpoint().getEndpointUri().substring("websocket-jsr356://".length());
- if (!endpointKey.contains("://")) { // we act as a client in all cases
- // here
- throw new IllegalArgumentException("You should pass a client uri");
+ final URI uri = getEndpoint().getUri();
+ if (uri.getScheme() != null && !uri.getScheme().equals("ws")) {
+ throw new IllegalArgumentException("WebSocket endpoint URI must be in the format: websocket-jsr356:ws://host:port/path");
}
final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create();
- manager = new ClientSessions(sessionCount, URI.create(endpointKey), clientConfig.build(), null);
+ manager = new ClientSessions(getEndpoint().getSessionCount(), uri, clientConfig.build(), null);
manager.prepare();
onExchange = (exchange, callback) -> manager.execute(session -> doSend(exchange, callback, session));
}
private void doSend(final Exchange exchange, final AsyncCallback callback, final Session session) {
try {
- JSR356WebSocketComponent.sendMessage(session, exchange.getIn().getBody());
+ Object body = exchange.getMessage().getBody();
+ synchronized (session) {
+ final RemoteEndpoint.Basic basicRemote = session.getBasicRemote();
+ if (String.class.isInstance(body)) {
+ basicRemote.sendText(String.valueOf(body));
+ } else if (ByteBuffer.class.isInstance(body)) {
+ basicRemote.sendBinary(ByteBuffer.class.cast(body));
+ } else if (InputStream.class.isInstance(body)) {
+ IOHelper.copy(InputStream.class.cast(body), basicRemote.getSendStream());
+ } else {
+ throw new IllegalArgumentException("Unsupported input: " + body);
+ }
+ }
} catch (final IOException e) {
exchange.setException(e);
} finally {
diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java
index 498437d..b270190 100644
--- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java
+++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java
@@ -16,50 +16,30 @@
*/
package org.apache.camel.websocket.jsr356;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
+import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.websocket.RemoteEndpoint;
-import javax.websocket.Session;
import javax.websocket.server.ServerContainer;
import org.apache.camel.Endpoint;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
-import org.apache.camel.util.IOHelper;
import static java.util.Optional.ofNullable;
@Component("websocket-jsr356")
public class JSR356WebSocketComponent extends DefaultComponent {
- // didn't find a better way to handle that unless we can assume the
- // CamelContext is in the ServletContext
- private static final Map<String, ContextBag> SERVER_CONTAINERS = new ConcurrentHashMap<>();
- protected int sessionCount;
+ private static final Map<String, ContextBag> SERVER_CONTAINERS = new ConcurrentHashMap<>();
@Override
- protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) {
- return new JSR356Endpoint(this, uri);
- }
-
- public static void sendMessage(final Session session, final Object message) throws IOException {
- final RemoteEndpoint.Basic basicRemote = session.getBasicRemote(); // todo: handle async?
- synchronized (session) {
- if (String.class.isInstance(message)) {
- basicRemote.sendText(String.valueOf(message));
- } else if (ByteBuffer.class.isInstance(message)) {
- basicRemote.sendBinary(ByteBuffer.class.cast(message));
- } else if (InputStream.class.isInstance(message)) {
- IOHelper.copy(InputStream.class.cast(message), basicRemote.getSendStream());
- } else {
- throw new IllegalArgumentException("Unsupported input: " + message);
- }
- }
+ protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception {
+ JSR356Endpoint endpoint = new JSR356Endpoint(this, uri);
+ endpoint.setUri(new URI(remaining));
+ setProperties(endpoint, parameters);
+ return endpoint;
}
public static void registerServer(final String contextPath, final ServerContainer container) {
@@ -71,8 +51,9 @@ public class JSR356WebSocketComponent extends DefaultComponent {
}
public static ContextBag getContext(final String context) {
- return ofNullable(context).map(SERVER_CONTAINERS::get)
- .orElseGet(() -> SERVER_CONTAINERS.size() == 1 ? SERVER_CONTAINERS.values().iterator().next() : SERVER_CONTAINERS.get(""));
+ return ofNullable(context)
+ .map(SERVER_CONTAINERS::get)
+ .orElseGet(() -> SERVER_CONTAINERS.size() == 1 ? SERVER_CONTAINERS.values().iterator().next() : SERVER_CONTAINERS.get(""));
}
public static final class ContextBag {
diff --git a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java
index 5c5b253..018143f 100644
--- a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java
+++ b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java
@@ -56,7 +56,7 @@ public class JSR356ConsumerTest extends CamelTestSupport {
final String message = ExistingServerEndpoint.class.getName() + "#" + testName.getMethodName();
final MockEndpoint mockEndpoint = getMockEndpoint("mock:" + testName.getMethodName());
mockEndpoint.expectedBodiesReceived(message);
- ExistingServerEndpoint.self.doSend(); // to avoid lifecycle issue suring
+ ExistingServerEndpoint.doSend(); // to avoid lifecycle issue during
// startup we send the message
// only here
mockEndpoint.assertIsSatisfied();
@@ -86,9 +86,9 @@ public class JSR356ConsumerTest extends CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("websocket-jsr356:///test").id("camel_consumer_acts_as_server").convertBodyTo(String.class).to("mock:ensureServerModeReceiveProperlyExchanges");
+ from("websocket-jsr356:///test?sessionCount=5").id("camel_consumer_acts_as_server").convertBodyTo(String.class).to("mock:ensureServerModeReceiveProperlyExchanges");
- from("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver").id("camel_consumer_acts_as_client")
+ from("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver?sessionCount=5").id("camel_consumer_acts_as_client")
.convertBodyTo(String.class).to("mock:ensureClientModeReceiveProperlyExchanges");
}
};
@@ -97,18 +97,15 @@ public class JSR356ConsumerTest extends CamelTestSupport {
@Dependent
@ServerEndpoint("/existingserver")
public static class ExistingServerEndpoint {
- private static ExistingServerEndpoint self;
-
- private Session session;
+ private static Session session;
@OnOpen
public void onOpen(final Session session) {
this.session = session;
- self = this;
}
- void doSend() throws IOException {
- session.getBasicRemote().sendText(getClass().getName() + "#ensureClientModeReceiveProperlyExchanges");
+ static void doSend() throws IOException {
+ session.getBasicRemote().sendText(ExistingServerEndpoint.class.getName() + "#ensureClientModeReceiveProperlyExchanges");
}
}
}
diff --git a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java
index aebcbfd..f224aeb 100644
--- a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java
+++ b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java
@@ -16,14 +16,11 @@
*/
package org.apache.camel.websocket.jsr356;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.enterprise.context.Dependent;
import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.apache.camel.Produce;
@@ -36,9 +33,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import static java.util.Collections.singletonList;
public class JSR356ProducerTest extends CamelTestSupport {
+
+ private static LinkedBlockingQueue<String> messages = new LinkedBlockingQueue<>();
+
@Rule
public final MeecrowaveRule servlet = new MeecrowaveRule(new Meecrowave.Builder() {
{
@@ -57,8 +56,7 @@ public class JSR356ProducerTest extends CamelTestSupport {
public void ensureServerModeSendsProperly() throws Exception {
final String body = getClass().getName() + "#" + testName.getMethodName();
serverProducer.sendBody(body);
- ExistingServerEndpoint.self.latch.await();
- assertEquals(singletonList(body), ExistingServerEndpoint.self.messages);
+ assertEquals(body, messages.poll(10, TimeUnit.SECONDS));
}
@Override
@@ -66,7 +64,7 @@ public class JSR356ProducerTest extends CamelTestSupport {
return new RouteBuilder() {
public void configure() {
from("direct:ensureServerModeSendsProperly").id("camel_consumer_acts_as_client").convertBodyTo(String.class)
- .to("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver");
+ .to("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver?sessionCount=5");
}
};
}
@@ -74,20 +72,9 @@ public class JSR356ProducerTest extends CamelTestSupport {
@Dependent
@ServerEndpoint("/existingserver")
public static class ExistingServerEndpoint {
- private static ExistingServerEndpoint self;
-
- private final Collection<String> messages = new ArrayList<>();
- private final CountDownLatch latch = new CountDownLatch(1);
-
- @OnOpen
- public void onOpen(final Session session) {
- self = this;
- }
-
@OnMessage
- public synchronized void onMessage(final String message) {
+ public void onMessage(final String message) {
messages.add(message);
- latch.countDown();
}
}
}
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java
index f3db04a..4f9cc85 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java
@@ -474,9 +474,10 @@ public interface JSR356WebSocketEndpointBuilderFactory {
*
* Syntax: <code>websocket-jsr356:websocketPathOrUri</code>
*
- * Path parameter: websocketPathOrUri
- * If a path (/foo) it will deploy locally the endpoint, if an uri it will
- * connect to the corresponding server
+ * Path parameter: uri
+ * If a schemeless URI path is provided, a ServerEndpoint is deployed under
+ * that path. Else if the URI is prefixed with the 'ws://' scheme, then a
+ * connection is established to the corresponding server
*/
default JSR356WebSocketEndpointBuilder websocketJsr356(String path) {
class JSR356WebSocketEndpointBuilderImpl extends AbstractEndpointBuilder implements JSR356WebSocketEndpointBuilder, AdvancedJSR356WebSocketEndpointBuilder {
diff --git a/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc b/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc
index c13459e..83acddc 100644
--- a/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc
+++ b/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc
@@ -64,7 +64,7 @@ with the following path and query parameters:
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *websocketPathOrUri* | If a path (/foo) it will deploy locally the endpoint, if an uri it will connect to the corresponding server | | String
+| *uri* | If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server | | URI
|===