You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:18 UTC
[17/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/package-info.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/package-info.java
new file mode 100644
index 0000000..277ce5c
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/package-info.java
@@ -0,0 +1,22 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * WebSocket Client Connector for sending and receiving messages to a WebSocket Server.
+ */
+package org.apache.edgent.connectors.wsclient.javax.websocket;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinaryReceiver.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinaryReceiver.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinaryReceiver.java
new file mode 100644
index 0000000..a34da52
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinaryReceiver.java
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
+
+import org.apache.edgent.function.Function;
+
+public class WebSocketClientBinaryReceiver<T> extends WebSocketClientReceiver<T> {
+ private static final long serialVersionUID = 1L;
+ private final Function<byte[],T> toTuple;
+
+ public WebSocketClientBinaryReceiver(WebSocketClientConnector connector, Function<byte[],T> toTuple) {
+ super(connector, null);
+ this.toTuple = toTuple;
+ }
+
+ void onBinaryMessage(byte[] message) {
+ eventHandler.accept(toTuple.apply(message));
+ }
+
+ void onTextMessage(String message) {
+ connector.getLogger().debug("{} ignoring received text message (expecting binary)", connector.id());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinarySender.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinarySender.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinarySender.java
new file mode 100644
index 0000000..e34703a
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientBinarySender.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
+
+import org.apache.edgent.function.Function;
+
+public class WebSocketClientBinarySender<T> extends WebSocketClientSender<T> {
+ private static final long serialVersionUID = 1L;
+ private final Function<T,byte[]> toPayload;
+
+ public WebSocketClientBinarySender(WebSocketClientConnector connector, Function<T,byte[]> toPayload) {
+ super(connector, null);
+ this.toPayload = toPayload;
+ }
+
+ @Override
+ public void accept(T value) {
+ connector.sendBinary(toPayload.apply(value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
new file mode 100644
index 0000000..7d9199c
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
@@ -0,0 +1,262 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Properties;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.ContainerProvider;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.edgent.connectors.runtime.Connector;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.javax.websocket.EdgentSslContainerProvider;
+//import org.eclipse.jetty.util.component.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ClientEndpoint
+public class WebSocketClientConnector extends Connector<Session> implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(WebSocketClientConnector.class);
+ private final Properties config;
+ private volatile String id;
+ private volatile String sid;
+ private WebSocketClientReceiver<?> msgReceiver;
+ private volatile WebSocketContainer container;
+ private final Supplier<WebSocketContainer> containerFn;
+
+ public WebSocketClientConnector(Properties config, Supplier<WebSocketContainer> containerFn) {
+ Objects.requireNonNull(config, "config");
+ this.config = config;
+ checkConfig();
+ this.containerFn = containerFn!=null
+ ? containerFn
+ : () -> getWebSocketContainer();
+ }
+
+ private void checkConfig() {
+ requireConfig("ws.uri");
+ URI uri = getEndpointURI();
+ if (!("ws".equals(uri.getScheme()) || "wss".equals(uri.getScheme())))
+ throw new IllegalArgumentException("ws.uri");
+ if (optionalConfig("ws.trustStore"))
+ requireConfig("ws.trustStorePassword");
+ if (optionalConfig("ws.keyStore"))
+ requireConfig("ws.keyStorePassword");
+ }
+
+ void setReceiver(WebSocketClientReceiver<?> msgReceiver) {
+ this.msgReceiver = msgReceiver;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ protected Session doConnect(Session session) throws Exception {
+ if (session == null || !session.isOpen()) {
+ if (session != null)
+ doClose(session);
+ if (container == null)
+ container = containerFn.get();
+ URI uri = getEndpointURI();
+ getLogger().info("{} connecting uri={}", id(), uri);
+ session = container.connectToServer(this, uri);
+ updateId(session);
+ getLogger().info("{} connected uri={}", id(), uri);
+ }
+ return session;
+ }
+
+ private WebSocketContainer getWebSocketContainer() throws RuntimeException {
+
+ // Ugh. Turns out there are some serious issues w/JSR356
+ // as well as Jetty client impl of it wrt SSL and
+ // trust and key store configurations.
+ //
+ // "wss" is OK unless: you need **programatic** trustStore
+ // OR need clientAuth at all.
+ //
+ // https://github.com/eclipse/jetty.project/issues/155
+
+ URI uri = getEndpointURI();
+
+ // Use the std code for the non-problematic cases
+ if ("ws".equals(uri.getScheme())
+ || (config.getProperty("ws.trustStore") == null
+ && config.getProperty("ws.keyStore") == null
+ && System.getProperty("javax.net.ssl.keyStore") == null))
+ {
+ return ContainerProvider.getWebSocketContainer();
+ }
+ else {
+ getLogger().info("##### Using ContainerProvider.getWebSocketContainer() workaround for SSL #####");
+
+ return EdgentSslContainerProvider.getSslWebSocketContainer(config);
+ }
+ }
+
+ private URI getEndpointURI() throws RuntimeException {
+ String uriStr = config.getProperty("ws.uri");
+ try {
+ return new URI(uriStr);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("ws.uri", e);
+ }
+ }
+
+ private void requireConfig(String id) {
+ if (config.getProperty(id) == null)
+ throw new IllegalArgumentException(id);
+ }
+
+ private boolean optionalConfig(String id) {
+ return config.getProperty(id) != null;
+ }
+
+ @Override
+ protected void doDisconnect(Session session) throws Exception {
+ // no disconnect from javax.websocket.Session
+ doClose(session);
+ }
+
+ @Override
+ protected void doClose(Session session) throws Exception {
+ getLogger().debug("{} doClose {}", id(), session);
+ try {
+ session.close();
+ }
+ finally {
+// // Force lifecycle stop when done with container.
+// // This is to free up threads and resources that the
+// // JSR-356 container allocates. But unfortunately
+// // the JSR-356 spec does not handle lifecycles (yet)
+// ((LifeCycle)container).stop();
+ }
+ }
+
+ private void updateId(Session session) {
+ sid = session.getId();
+ id = null;
+ }
+
+ @Override
+ protected String id() {
+ if (id == null) {
+ // include our short object Id
+ id = "WSCLIENT " + toString().substring(toString().indexOf('@') + 1)
+ + " sid=" + sid;
+ }
+ return id;
+ }
+
+ @OnError
+ public void onError(Session client, Throwable t) {
+ getLogger().error("{} onError {}", id(), t);
+ }
+
+ @OnMessage
+ public void onTextMessage(String message) {
+ getLogger().trace("{} onTextMessage {}", id(), message);
+ if (msgReceiver != null) {
+ msgReceiver.onTextMessage(message);
+ }
+ }
+
+ @OnMessage
+ public void onBinaryMessage(byte[] message) {
+ getLogger().trace("{} onBinaryMessage {} bytes.", id(), message.length);
+ if (msgReceiver != null) {
+ msgReceiver.onBinaryMessage(message);
+ }
+ }
+
+ void sendBinary(byte[] bytes) {
+ while (true) {
+ Session session = getConnectedSession();
+ try {
+ session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
+ getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
+ return;
+ }
+ catch (IOException e) {
+ if (!session.isOpen()) {
+ connectionLost(e); // logs error
+ getLogger().error("{} sendBinary {} bytes failed. Retrying following connection lost", id(), bytes.length);
+ // retry
+ }
+ else {
+ getLogger().error("{} sendBinary {} bytes failed", id(), bytes.length, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ void sendText(String msg) {
+ while (true) {
+ Session session = getConnectedSession();
+ try {
+ session.getBasicRemote().sendText(msg);
+ getLogger().trace("{} sendText {}", id(), msg);
+ return;
+ }
+ catch (IOException e) {
+ if (!session.isOpen()) {
+ connectionLost(e); // logs error
+ getLogger().error("{} sendText {} chars failed. Retrying following connection lost", id(), msg.length());
+ // retry
+ }
+ else {
+ getLogger().error("{} sendText {} chars failed", id(), msg.length(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private Session getConnectedSession() {
+ try {
+ return client();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted", e);
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientReceiver.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientReceiver.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientReceiver.java
new file mode 100644
index 0000000..ada34a4
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientReceiver.java
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+
+public class WebSocketClientReceiver<T> implements Consumer<Consumer<T>>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ protected final WebSocketClientConnector connector;
+ private final Function<String,T> toTuple;
+ protected Consumer<T> eventHandler;
+
+ public WebSocketClientReceiver(WebSocketClientConnector connector, Function<String,T> toTuple) {
+ this.connector = connector;
+ this.toTuple = toTuple;
+ }
+
+ @Override
+ public void accept(Consumer<T> eventHandler) {
+ this.eventHandler = eventHandler;
+ connector.setReceiver(this);
+ try {
+ connector.client(); // induce connecting.
+ } catch (Exception e) {
+ connector.getLogger().error("{} receiver setup failed", connector.id(), e);
+ }
+ }
+
+ void onBinaryMessage(byte[] message) {
+ connector.getLogger().debug("{} ignoring received binary message (expecting text)", connector.id());
+ }
+
+ void onTextMessage(String message) {
+ eventHandler.accept(toTuple.apply(message));
+ }
+
+ @Override
+ public void close() throws Exception {
+ connector.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientSender.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientSender.java b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientSender.java
new file mode 100644
index 0000000..b0ad2e7
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/main/java/org/apache/edgent/connectors/wsclient/javax/websocket/runtime/WebSocketClientSender.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.wsclient.javax.websocket.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+
+public class WebSocketClientSender<T> implements Consumer<T>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ protected final WebSocketClientConnector connector;
+ protected final Function<T,String> toPayload;
+
+ public WebSocketClientSender(WebSocketClientConnector connector, Function<T,String> toPayload) {
+ this.connector = connector;
+ this.toPayload = toPayload;
+ }
+
+ @Override
+ public void accept(T value) {
+ connector.sendText(toPayload.apply(value));
+ }
+
+ @Override
+ public void close() throws Exception {
+ connector.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java b/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
deleted file mode 100644
index c8d1130..0000000
--- a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.tests.connectors.wsclient.javax.websocket;
-
-/**
- * WebSocketClient connector globalization tests.
- */
-public class WebSocketClientGlobalTest extends WebSocketClientTest {
- private final static String globalStr1 = "\u4e00";
- private final static String globalStr2 = "\u4e8c";
- private final static String globalStr3 = "\u4e09\u4e09";
- private final static String globalStr4 = "\u56db";
-
- public String getStr1() {
- return globalStr1;
- }
-
- public String getStr2() {
- return globalStr2;
- }
-
- public String getStr3() {
- return globalStr3;
- }
-
- public String getStr4() {
- return globalStr4;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java b/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
deleted file mode 100644
index 4d3fd28..0000000
--- a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
+++ /dev/null
@@ -1,857 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.tests.connectors.wsclient.javax.websocket;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assume.assumeTrue;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.After;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.wsclient.WebSocketClient;
-import edgent.connectors.wsclient.javax.websocket.Jsr356WebSocketClient;
-import edgent.test.connectors.common.ConnectorTestBase;
-import edgent.test.connectors.common.TestRepoPath;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.json.JsonFunctions;
-import edgent.topology.plumbing.PlumbingStreams;
-
-public class WebSocketClientTest extends ConnectorTestBase {
- private final static int SEC_TMO = 5;
- WebSocketServerEcho wsServer;
- boolean isExternalServer;// = true;
- int wsServerPort = !isExternalServer ? 0 : 49460;
- String wsUriPath = "/echo"; // match what WsServerEcho is using
- private final static String str1 = "one";
- private final static String str2 = "two";
- private final static String str3 = "three-post-reconnect";
- private final static String str4 = "four";
-
- public String getStr1() {
- return str1;
- }
-
- public String getStr2() {
- return str2;
- }
-
- public String getStr3() {
- return str3;
- }
-
- public String getStr4() {
- return str4;
- }
-
- @After
- public void cleanup() {
- if (wsServer != null)
- wsServer.stop();
- wsServer = null;
- }
-
- private enum ServerMode { WS, SSL, SSL_CLIENT_AUTH }
- private void startEchoer() {
- startEchoer(ServerMode.WS);
- }
- private void startEchoer(ServerMode mode) {
- try {
- if (!isExternalServer) {
- URI uri;
- if (mode==ServerMode.WS) {
- uri = new URI("ws://localhost:0");
- wsServer = new WebSocketServerEcho();
- }
- else {
- uri = new URI("wss://localhost:0");
- wsServer = new WebSocketServerEcho();
- }
- wsServer.start(uri, mode==ServerMode.SSL_CLIENT_AUTH);
- wsServerPort = wsServer.getPort();
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- throw new RuntimeException("startEchoer",e );
- }
- }
- private void restartEchoer(int secDelay) {
- wsServer.restart(secDelay);
- }
-
- Properties getConfig() {
- return getWsConfig();
- }
-
- Properties getWsConfig() {
- Properties config = new Properties();
- config.setProperty("ws.uri", getWsUri());
- return config;
- }
-
- Properties getWssConfig() {
- Properties config = new Properties();
- config.setProperty("ws.uri", getWssUri());
- config.setProperty("ws.trustStore", getStorePath("clientTrustStore.jks"));
- config.setProperty("ws.trustStorePassword", "passw0rd");
- config.setProperty("ws.keyStore", getStorePath("clientKeyStore.jks"));
- config.setProperty("ws.keyStorePassword", "passw0rd");
- // default: expect key to have the default alias
- return config;
- }
-
- String getWsUri() {
- int port = wsServerPort==0 ? 8080 : wsServerPort;
- return "ws://localhost:"+port+wsUriPath;
- }
-
- String getWssUri() {
- int port = wsServerPort==0 ? 443 : wsServerPort;
- return "wss://localhost:"+port+wsUriPath;
- }
-
- private String getStorePath(String storeLeaf) {
- return TestRepoPath.getPath("connectors", "wsclient-javax.websocket", "src", "test", "keystores", storeLeaf);
- }
-
- @Test
- public void testBasicStaticStuff() {
- Topology t = newTopology("testBasicStaticStuff");
-
- Properties config = getConfig();
- WebSocketClient wsClient1 = new Jsr356WebSocketClient(t, config);
-
- TStream<String> s1 = wsClient1.receiveString();
- assertNotNull("s1", s1);
-
- TSink<String> sink1 = wsClient1.sendString(t.strings(getStr1(), getStr2()));
- assertNotNull("sink1", sink1);
-
- WebSocketClient wsClient2 = new Jsr356WebSocketClient(t, config);
- TStream<String> s2 = wsClient2.receiveString();
- assertNotSame("s1 s2", s1, s2);
-
- TSink<String> sink2 = wsClient2.sendString(t.strings(getStr1(), getStr2()));
- assertNotSame("sink1 sink2", sink1, sink2);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMissingWsUri() {
- Topology t = newTopology("testMissingWsUri");
- new Jsr356WebSocketClient(t, new Properties());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMalformedWsUri() {
- Topology t = newTopology("testMalformedWsUri");
- Properties config = new Properties();
- config.setProperty("ws.uri", "localhost"); // missing scheme
- new Jsr356WebSocketClient(t, config);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotWsUri() {
- Topology t = newTopology("testNotWsUri");
- Properties config = new Properties();
- config.setProperty("ws.uri", "tcp://localhost");
- new Jsr356WebSocketClient(t, config);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWssTrustStorePasswordNeg() {
- Topology t = newTopology("testWssTrustStorePasswordNeg");
- Properties config = new Properties();
- config.setProperty("ws.uri", getWssUri());
- config.setProperty("ws.trustStore", "xyzzy"); // not checked till runtime
- // missing trustStorePassword
- new Jsr356WebSocketClient(t, config);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWssKeyStorePasswordNeg() {
- Topology t = newTopology("testWssKeyStorePasswordNeg");
- Properties config = new Properties();
- config.setProperty("ws.uri", getWssUri());
- config.setProperty("ws.keyStore", "xyzzy"); // not checked till runtime
- // missing keyStorePassword
- new Jsr356WebSocketClient(t, config);
- }
-
- @Test
- public void testWssConfig() {
- Topology t = newTopology("testWssConfig");
- Properties config = new Properties();
- config.setProperty("ws.uri", getWssUri());
- config.setProperty("ws.trustStore", "xyzzy"); // not checked till runtime
- config.setProperty("ws.trustStorePassword", "xyzzy"); // not checked till runtime
- new Jsr356WebSocketClient(t, config);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testTooManySendersNeg() {
- Topology t = newTopology("testTooManySendersNeg");
- TStream<String> s1 = t.strings(getStr1(), getStr2());
- TStream<String> s2 = t.strings(getStr1(), getStr2());
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
- wsClient.sendString(s1);
- wsClient.sendString(s2); // should throw
- }
-
- @Test(expected = IllegalStateException.class)
- public void testTooManyReceiversNeg() {
- Topology t = newTopology("testTooManyReceiversNeg");
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
- @SuppressWarnings("unused")
- TStream<String> s1 = wsClient.receiveString();
- @SuppressWarnings("unused")
- TStream<String> s2 = wsClient.receiveString(); // should throw
- }
-
- @Test
- public void testJson() throws Exception {
- Topology t = newTopology("testJson");
- System.out.println("===== "+t.getName());
-
- startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] {
- "{\"id\":\"" + getStr1() + "\",\"value\":27}",
- "{\"id\":\"" + getStr2() + "\",\"value\":13}"
- };
-
- TStream<JsonObject> s = t.strings(expected)
- .map(JsonFunctions.fromString());
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.send(s);
-
- TStream<String> rcvd = wsClient.receive()
- .map(JsonFunctions.asString());
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testString() throws Exception {
- Topology t = newTopology("testString");
- System.out.println("===== "+t.getName());
-
- startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testBytes() throws Exception {
- Topology t = newTopology("testBytes");
- System.out.println("===== "+t.getName());
-
- startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<byte[]> s = t.strings(expected)
- .map(tup -> tup.getBytes());
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendBytes(s);
-
- TStream<String> rcvd = wsClient.receiveBytes()
- .map(tup -> new String(tup));
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testReconnect() throws Exception {
- /*
- * It's becomming apparent that the reconnect series of tests
- * aren't reliable so skip them for ci. See jira EDGENT-122 for
- * more info.
- */
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology t = newTopology("testReconnect");
- System.out.println("===== "+t.getName());
-
- startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2(), getStr3(), getStr4() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
-
- // send one, two, restart the server to force reconnect, send the next
- AtomicInteger numSent = new AtomicInteger();
- int restartAfterTupleCnt = 2;
- CountDownLatch latch = new CountDownLatch(restartAfterTupleCnt);
- s = s.filter(tuple -> {
- if (numSent.getAndIncrement() != restartAfterTupleCnt )
- return true;
- else {
- // to keep validation sane/simple wait till the tuples are rcvd before restarting
- try { latch.await(); } catch (Exception e) {};
- restartEchoer(2/*secDelay*/);
- return true;
- }
- });
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString()
- .peek(tuple -> latch.countDown());
-
-
- completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
- }
-
- @Test
- public void testReconnectBytes() throws Exception {
- /*
- * It's becomming apparent that the reconnect series of tests
- * aren't reliable so skip them for ci. See jira EDGENT-122 for
- * more info.
- */
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology t = newTopology("testReconnectBytes");
- System.out.println("===== "+t.getName());
-
- startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2(), getStr3(), getStr4() };
-
- TStream<byte[]> s = t.strings(expected).map(tup -> tup.getBytes());
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
-
- // send one, two, restart the server to force reconnect, send the next
- AtomicInteger numSent = new AtomicInteger();
- int restartAfterTupleCnt = 2;
- CountDownLatch latch = new CountDownLatch(restartAfterTupleCnt);
- s = s.filter(tuple -> {
- if (numSent.getAndIncrement() != restartAfterTupleCnt )
- return true;
- else {
- // to keep validation sane/simple wait till the tuples are rcvd before restarting
- try { latch.await(); } catch (Exception e) {};
- restartEchoer(2/*secDelay*/);
- return true;
- }
- });
- wsClient.sendBytes(s);
-
- TStream<String> rcvd = wsClient.receiveBytes()
- .peek(tuple -> latch.countDown())
- .map(tup -> new String(tup));
-
- completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
- }
-
- private class SslSystemPropMgr {
- private final Map<String,String> origProps = new HashMap<>();
-
- public void set() {
- set("javax.net.ssl.trustStore", getStorePath("clientTrustStore.jks"));
- set("javax.net.ssl.trustStorePassword", "passw0rd");
- set("javax.net.ssl.keyStore", getStorePath("clientKeyStore.jks"));
- set("javax.net.ssl.keyStorePassword", "passw0rd");
- }
-
- private void set(String prop, String defaultVal) {
- origProps.put(prop, System.setProperty(prop, defaultVal));
- }
-
- public void restore() {
- restore("javax.net.ssl.trustStore");
- restore("javax.net.ssl.trustStorePassword");
- restore("javax.net.ssl.keyStore");
- restore("javax.net.ssl.keyStorePassword");
- }
-
- private void restore(String prop) {
- String origValue = origProps.get(prop);
- if (origValue == null)
- System.getProperties().remove(prop);
- else
- System.setProperty(prop, origValue);
- }
- }
-
- @Test
- public void testSslSystemProperty() throws Exception {
- Topology t = newTopology("testSslSystemProperty");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL); // before getConfig() so it gets the port
-
- Properties config = getConfig(); // no SSL config stuff
- config.setProperty("ws.uri", getWssUri());
-
- SslSystemPropMgr sslProps = new SslSystemPropMgr();
- try {
- // a trust store that contains the server's cert
- sslProps.set();
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
- finally {
- sslProps.restore();
- }
- }
-
- @Test
- public void testSslClientAuthSystemProperty() throws Exception {
- Topology t = newTopology("testSslClientAuthSystemProperty");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- Properties config = getConfig(); // no SSL config stuff
- config.setProperty("ws.uri", getWssUri());
-
- SslSystemPropMgr sslProps = new SslSystemPropMgr();
- try {
- // a trust store that contains the server's cert
- sslProps.set();
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
- finally {
- sslProps.restore();
- }
- }
-
- @Test
- public void testSsl() throws Exception {
-
- Topology t = newTopology("testSsl");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL); // before getConfig() so it gets the port
-
- Properties config = getWssConfig();
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslReconnect() throws Exception {
- /*
- * It's becomming apparent that the reconnect series of tests
- * aren't reliable so skip them for ci. See jira EDGENT-122 for
- * more info.
- */
- assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
-
- Topology t = newTopology("testSslReconnect");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL); // before getConfig() so it gets the port
-
- Properties config = getWssConfig();
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2(), getStr3(), getStr4() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
-
- // send one, two, restart the server to force reconnect, send the next
- AtomicInteger numSent = new AtomicInteger();
- int restartAfterTupleCnt = 2;
- CountDownLatch latch = new CountDownLatch(restartAfterTupleCnt);
- s = s.filter(tuple -> {
- if (numSent.getAndIncrement() != restartAfterTupleCnt )
- return true;
- else {
- // to keep validation sane/simple wait till the tuples are rcvd before restarting
- try { latch.await(); } catch (Exception e) {};
- restartEchoer(2/*secDelay*/);
- return true;
- }
- });
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString()
- .peek(tuple -> latch.countDown());
-
- completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
- }
-
- @Test
- public void testSslNeg() throws Exception {
-
- Topology t = newTopology("testSslNeg");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL); // before getConfig() so it gets the port
-
- // since our server uses a self-signed cert, if we don't have
- // a truststore setup with it in it, the client will fail to connect
- // and ultimately the connect will fail and the test will
- // receive nothing.
-
- Properties config = getConfig(); // no SSL config stuff
- config.setProperty("ws.uri", getWssUri());
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, new String[0]); //rcv nothing
- }
-
- @Test
- public void testSslClientAuth() throws Exception {
-
- Topology t = newTopology("testSslClientAuth");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- Properties config = getWssConfig();
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslClientAuthDefault() throws Exception {
-
- Topology t = newTopology("testSslClientAuthDefault");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- // explicitly specify client's "default" certificate
- Properties config = getWssConfig();
- config.setProperty("ws.keyCertificateAlias", "default");
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslClientAuthMy2ndCertNeg() throws Exception {
-
- Topology t = newTopology("testSslClientAuthMy2ndCertNeg");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- // explicitly specify client's "my2ndcert" certificate - unknown to server
- Properties config = getWssConfig();
- config.setProperty("ws.keyCertificateAlias", "my2ndcert");
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, new String[0]); // rcv nothing
- }
-
- @Test
- public void testSslClientAuthMy3rdCert() throws Exception {
-
- Topology t = newTopology("testSslClientAuthMy3rdCert");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- // explicitly specify client's "my3rdcert" certificate
- Properties config = getWssConfig();
- config.setProperty("ws.keyCertificateAlias", "my3rdcert");
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslClientAuthNeg() throws Exception {
-
- Topology t = newTopology("testSslClientAuthNeg");
- System.out.println("===== "+t.getName());
-
- startEchoer(ServerMode.SSL_CLIENT_AUTH); // before getConfig() so it gets the port
-
- // since our server will require client auth, if we don't have
- // a keystore setup with it in it, the client will fail to connect
- // and ultimately the connect will fail and the test will
- // receive nothing.
-
- Properties config = getConfig(); // no SSL config stuff
- config.setProperty("ws.uri", getWssUri());
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, new String[0]); //rcv nothing
- }
-
- private void skipTestIfCantConnect(Properties config) throws Exception {
- String wsUri = config.getProperty("ws.uri");
- // Skip tests if the WebSocket server can't be contacted.
- try {
- URI uri = new URI(wsUri);
- int port = uri.getPort();
- if (port == -1)
- port = uri.getScheme().equals("ws") ? 80 : 443;
- Socket s = new Socket();
- s.connect(new InetSocketAddress(uri.getHost(), port), 5*1000/*cn-timeout-msec*/);
- s.close();
- } catch (Exception e) {
- System.err.println("Unable to connect to WebSocket server "+wsUri+" : "+e.getMessage());
- e.printStackTrace();
- assumeTrue(false);
- }
- }
-
- @Test
- public void testPublicServer() throws Exception {
- Topology t = newTopology("testPublicServer");
- System.out.println("===== "+t.getName());
-
- // startEchoer(); // before getConfig() so it gets the port
-
- Properties config = getConfig();
- config.setProperty("ws.uri", "ws://echo.websocket.org");
- skipTestIfCantConnect(config);
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslPublicServer() throws Exception {
- Topology t = newTopology("testSslPublicServer");
- System.out.println("===== "+t.getName());
-
- // startEchoer(); // before getConfig() so it gets the port
-
- // Check operation against a trusted CA signed server certificate.
- //
- // this public wss echo server should "just work" if you have
- // connectivity. no additional ssl trustStore config is needed
- // as the site has a certificate signed by a recognized CA.
-
- Properties config = getConfig();
- config.setProperty("ws.uri", "wss://echo.websocket.org");
- skipTestIfCantConnect(config);
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, expected);
- }
-
- @Test
- public void testSslPublicServerBadTrustStoreSystemPropertyNeg() throws Exception {
- Topology t = newTopology("testSslPublicServerBadTrustStoreSystemPropertyNeg");
- System.out.println("===== "+t.getName());
-
- // startEchoer(); // before getConfig() so it gets the port
-
- // this public wss echo server should "just work" if you have
- // connectivity. no additional ssl trustStore config is needed
- // as the site has a certificate signed by a recognized CA.
-
- // Set a trust store that doesn't contain the public server's cert nor CAs
- // and ultimately the connect will fail and the test will
- // receive nothing.
-
- Properties config = getConfig();
- config.setProperty("ws.uri", "wss://echo.websocket.org");
- skipTestIfCantConnect(config);
-
- SslSystemPropMgr sslProps = new SslSystemPropMgr();
- try {
- sslProps.set();
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
-
- String[] expected = new String[] { getStr1(), getStr2() };
-
- TStream<String> s = t.strings(expected);
- s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
- wsClient.sendString(s);
-
- TStream<String> rcvd = wsClient.receiveString();
-
- completeAndValidate("", t, rcvd, SEC_TMO, new String[0]); //rcv nothing
- }
- finally {
- sslProps.restore();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java b/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
deleted file mode 100644
index 0f533c3..0000000
--- a/connectors/wsclient-javax.websocket/src/test/java/edgent/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.tests.connectors.wsclient.javax.websocket;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.websocket.CloseReason;
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
-import javax.websocket.server.ServerContainer;
-import javax.websocket.server.ServerEndpoint;
-
-import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.HttpConfiguration;
-import org.eclipse.jetty.server.HttpConnectionFactory;
-import org.eclipse.jetty.server.SecureRequestCustomizer;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.SslConnectionFactory;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
-
-import edgent.test.connectors.common.TestRepoPath;
-
-/**
- * Simple WebSocket server program to echo received messages.
- * <p>
- * See https://github.com/jetty-project/embedded-jetty-websocket-examples
- */
-@ServerEndpoint(value="/echo")
-public class WebSocketServerEcho {
- private final String svrName = this.getClass().getSimpleName();
- private Server server;
- private ServerConnector connector;
- private URI curEndpointURI;
- private boolean curNeedClientAuth;
- private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
-
- public static void main(String[] args) throws Exception {
- URI uri = new URI("ws://localhost:0");
- boolean needClientAuth = false;
- if (args.length > 0)
- uri = new URI(args[0]);
- if (args.length > 1)
- needClientAuth = "needClientAuth".equals(args[1]);
- WebSocketServerEcho srvr = new WebSocketServerEcho();
- srvr.start(uri, needClientAuth);
- }
-
- public void start(URI endpointURI) {
- start(endpointURI, false);
- }
-
- public void start(URI endpointURI, boolean needClientAuth) {
- curEndpointURI = endpointURI;
- curNeedClientAuth = needClientAuth;
-
- System.out.println(svrName+" "+endpointURI + " needClientAuth="+needClientAuth);
-
- server = createServer(endpointURI, needClientAuth);
- connector = (ServerConnector)server.getConnectors()[0];
-
- // Setup the basic application "context" for this application at "/"
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- server.setHandler(context);
-
- try {
- // Initialize javax.websocket layer
- ServerContainer wscontainer = WebSocketServerContainerInitializer.configureContext(context);
-
- // Add WebSocket endpoint to javax.websocket layer
- wscontainer.addEndpoint(this.getClass());
-
- // System.setProperty("javax.net.debug", "ssl"); // or "all"; "help" for full list
-
- server.start();
- System.out.println(svrName+" started "+connector);
- // server.dump(System.err);
- }
- catch (Exception e) {
- throw new RuntimeException("start", e);
- }
- }
-
- private Server createServer(URI endpointURI, boolean needClientAuth) {
- if ("ws".equals(endpointURI.getScheme())) {
- return new Server(endpointURI.getPort());
- }
- else if ("wss".equals(endpointURI.getScheme())) {
- // see http://git.eclipse.org/c/jetty/org.eclipse.jetty.project.git/tree/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java
- // http://git.eclipse.org/c/jetty/org.eclipse.jetty.project.git/tree/examples/embedded/src/main/java/org/eclipse/jetty/embedded/LikeJettyXml.java
-
- Server server = new Server();
-
- SslContextFactory sslContextFactory = new SslContextFactory();
- sslContextFactory.setKeyStorePath(getStorePath("serverKeyStore.jks"));
- sslContextFactory.setKeyStorePassword("passw0rd");
- sslContextFactory.setKeyManagerPassword("passw0rd");
- sslContextFactory.setCertAlias("default");
- sslContextFactory.setNeedClientAuth(needClientAuth);
- sslContextFactory.setTrustStorePath(getStorePath("serverTrustStore.jks"));
- sslContextFactory.setTrustStorePassword("passw0rd");
-
- HttpConfiguration httpsConfig = new HttpConfiguration();
- httpsConfig.addCustomizer(new SecureRequestCustomizer());
-
- ServerConnector https= new ServerConnector(server,
- new SslConnectionFactory(sslContextFactory,
- HttpVersion.HTTP_1_1.asString()),
- new HttpConnectionFactory(httpsConfig));
- https.setPort(endpointURI.getPort());
-
- server.addConnector(https);
- return server;
- }
- else
- throw new IllegalArgumentException("unrecognized uri: "+endpointURI);
- }
-
- private String getStorePath(String storeLeaf) {
- return TestRepoPath.getPath("connectors", "wsclient-javax.websocket", "src", "test", "keystores", storeLeaf);
- }
-
- public int getPort() {
- // returns -1 if called before started
- return connector.getLocalPort();
- }
-
- /** restart a running server on the same port, etc: stop, delay, start
- * @param secDelay the amount to delay in seconds before initiating the restart
- */
- public void restart(int secDelay) {
- // stop, schedule delay&start and return
- URI endpointURI = setPort(curEndpointURI, getPort());
- try {
- System.out.println(svrName+" restart: stop "+connector);
- connector.stop();
- } catch (Exception e) {
- throw new RuntimeException("restart", e);
- }
- System.out.println(svrName+" restart: scheduling start after "+secDelay+"sec");
- schedExecutor.schedule(() -> {
- System.out.println(svrName+" restart: starting...");
- start(endpointURI, curNeedClientAuth);
- }, secDelay, TimeUnit.SECONDS);
- }
-
- private URI setPort(URI endpointURI, int port) {
- try {
- URI uri = endpointURI;
- if (uri.getPort() != port) {
- uri = new URI(uri.getScheme(), uri.getUserInfo(),
- uri.getHost(), port,
- uri.getPath(), uri.getQuery(), uri.getFragment());
- }
- return uri;
- } catch (URISyntaxException e) {
- throw new RuntimeException("Unable to create URI", e);
- }
- }
-
- public void stop() {
- if (connector != null) {
- try {
- System.out.println(svrName+" stop "+connector);
- connector.stop();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally {
- connector = null;
- }
- }
- }
-
- @OnOpen
- public void opOpen(Session session) {
- System.out.println(svrName+" onOpen ");
- }
-
- @OnClose
- public void onClose(Session session, CloseReason reason) {
- System.out.println(svrName+" onClose reason="+reason);
- }
-
- @OnMessage
- public void onStringMessage(Session session, String message) {
- System.out.println(svrName+" onStringMessage msg="+message);
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } // "echo" response
- }
-
- @OnMessage
- public void onByteMessage(Session session, ByteBuffer message) {
- System.out.println(svrName+" onByteMessage "+message.array().length+" bytes");
- try {
- session.getBasicRemote().sendBinary(message);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } // "echo" response
- }
-
- @OnError
- public void onError(Throwable cause) {
- System.err.println(svrName+" onError " + cause);
- cause.printStackTrace(System.err);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/wsclient-javax.websocket/src/test/java/org/apache/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/org/apache/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java b/connectors/wsclient-javax.websocket/src/test/java/org/apache/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
new file mode 100644
index 0000000..3b25c58
--- /dev/null
+++ b/connectors/wsclient-javax.websocket/src/test/java/org/apache/edgent/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.tests.connectors.wsclient.javax.websocket;
+
+/**
+ * WebSocketClient connector globalization tests.
+ */
+public class WebSocketClientGlobalTest extends WebSocketClientTest {
+ private final static String globalStr1 = "\u4e00";
+ private final static String globalStr2 = "\u4e8c";
+ private final static String globalStr3 = "\u4e09\u4e09";
+ private final static String globalStr4 = "\u56db";
+
+ public String getStr1() {
+ return globalStr1;
+ }
+
+ public String getStr2() {
+ return globalStr2;
+ }
+
+ public String getStr3() {
+ return globalStr3;
+ }
+
+ public String getStr4() {
+ return globalStr4;
+ }
+
+}