You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/07 19:26:20 UTC
[10/50] incubator-quarks git commit: Add client reconnect handling
Add client reconnect handling
- add tests
- enhance doc for text vs byte msgs
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/02159772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/02159772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/02159772
Branch: refs/heads/master
Commit: 021597720aa0bfcbf32ec60ba9372da230d3e8cc
Parents: a894c9c
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Mar 4 11:07:27 2016 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Mar 4 11:07:27 2016 -0500
----------------------------------------------------------------------
.../javax/websocket/Jsr356WebSocketClient.java | 12 +++
.../runtime/WebSocketClientConnector.java | 63 +++++++++---
.../javax/websocket/WebSocketClientTest.java | 100 +++++++++++++++++--
.../javax/websocket/WebSocketServerEcho.java | 40 +++++++-
.../connectors/wsclient/WebSocketClient.java | 14 ++-
5 files changed, 200 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
index 8b5db91..b24ac47 100644
--- a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
+++ b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
@@ -54,6 +54,10 @@ import quarks.topology.json.JsonFunctions;
* r.print();
* }</pre>
* <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages.
+ * A receiver only receives the messages of the type that it requests.
+ * <p>
* The connector is written against the JSR356 {@code javax.websockets} API.
* {@code javax.websockets} uses the {@link java.util.ServiceLoader} to load
* an implementation of {@code javax.websocket.ContainerProvider}.
@@ -183,6 +187,10 @@ public class Jsr356WebSocketClient implements WebSocketClient{
/**
* Create a stream of String tuples from received WebSocket text messages.
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages. This method only receives messages sent as text.
+ *
* @return the stream
*/
public TStream<String> receiveString() {
@@ -196,6 +204,10 @@ public class Jsr356WebSocketClient implements WebSocketClient{
/**
* Create a stream of byte[] tuples from received WebSocket binary messages.
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages. This method only receives messages sent as bytes.
+ *
* @return the stream
*/
public TStream<byte[]> receiveBytes() {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
index 6e83fec..673f628 100644
--- a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
+++ b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
@@ -4,6 +4,7 @@
*/
package quarks.connectors.wsclient.javax.websocket.runtime;
+import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
@@ -184,30 +185,62 @@ public class WebSocketClientConnector extends Connector<Session> implements Seri
msgReceiver.onBinaryMessage(message);
}
}
-
- void sendBinary(byte[] bytes) {
- try {
- client().getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
- getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
+
+ private Session getConnectedSession() {
+ try {
+ return client();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted", e);
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Unexpected condition", e);
}
}
- void sendText(String msg) {
- try {
- client().getBasicRemote().sendText(msg);
- getLogger().trace("{} sendText {}", id(), msg);
- }
- catch (RuntimeException e) {
- throw e;
+ void sendBinary(byte[] bytes) {
+ while (true) {
+ Session session = getConnectedSession();
+ try {
+ session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
+ getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
+ return;
+ }
+ catch (IOException e) {
+ if (!session.isOpen()) {
+ connectionLost(e); // logs
+ // retry
+ }
+ else {
+ getLogger().error("{} sendBinary failed", id(), e);
+ throw new RuntimeException("Unexpected condition", e);
+ }
+ }
}
- catch (Exception e) {
- throw new RuntimeException(e);
+ }
+
+ void sendText(String msg) {
+ while (true) {
+ Session session = getConnectedSession();
+ try {
+ session.getBasicRemote().sendText(msg);
+ getLogger().trace("{} sendText {}", id(), msg);
+ return;
+ }
+ catch (IOException e) {
+ if (!session.isOpen()) {
+ connectionLost(e); // logs
+ // retry
+ }
+ else {
+ getLogger().error("{} sendText failed", id(), e);
+ throw new RuntimeException("Unexpected condition", e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
index 1b99ddc..bf260db 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
@@ -6,13 +6,13 @@ package quarks.tests.connectors.wsclient.javax.websocket;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
-import static org.junit.Assume.assumeTrue;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Test;
@@ -276,29 +276,72 @@ public class WebSocketClientTest extends ConnectorTestBase {
@Test
public void testReconnect() throws Exception {
- assumeTrue(false); // TODO
-
Topology t = newTopology("testReconnect");
System.out.println("===== "+t.getName());
startEchoer(); // before getConfig() so it gets the port
+
+ Properties config = getConfig();
+ WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+
+ String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
+
+ TStream<String> s = t.strings(expected);
+ s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+
+ // send one, two, restart the server to force reconnect, send the next
+ AtomicInteger cnt = new AtomicInteger();
+ s = s.filter(tuple -> {
+ if (cnt.getAndIncrement() != 2)
+ return true;
+ else {
+ // delay so we rcv the prior echo'd tuple
+ try { Thread.sleep(500); } catch (Exception e) {};
+ restartEchoer(2/*secDelay*/);
+ return true;
+ }
+ });
+ wsClient.sendString(s);
- // TODO
- restartEchoer(2);
+ TStream<String> rcvd = wsClient.receiveString();
+
+ completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
}
@Test
- public void testSslReconnect() throws Exception {
+ public void testReconnectBytes() throws Exception {
- assumeTrue(false); // TODO
-
- Topology t = newTopology("testSslReconnect");
+ Topology t = newTopology("testReconnectBytes");
System.out.println("===== "+t.getName());
startEchoer(); // before getConfig() so it gets the port
+
+ Properties config = getConfig();
+ WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+
+ String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
- // TODO
- restartEchoer(2);
+ TStream<byte[]> s = t.strings(expected).map(tup -> tup.getBytes());
+ s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+
+ // send one, two, restart the server to force reconnect, send the next
+ AtomicInteger cnt = new AtomicInteger();
+ s = s.filter(tuple -> {
+ if (cnt.getAndIncrement() != 2)
+ return true;
+ else {
+ // delay so we rcv the prior echo'd tuple
+ try { Thread.sleep(500); } catch (Exception e) {};
+ restartEchoer(2/*secDelay*/);
+ return true;
+ }
+ });
+ wsClient.sendBytes(s);
+
+ TStream<String> rcvd = wsClient.receiveBytes()
+ .map(tup -> new String(tup));
+
+ completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
}
private class SslSystemPropMgr {
@@ -424,6 +467,41 @@ public class WebSocketClientTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TMO, expected);
}
+ @Test
+ public void testSslReconnect() throws Exception {
+
+ Topology t = newTopology("testSslReconnect");
+ System.out.println("===== "+t.getName());
+
+ startEchoer(ServerMode.SSL); // before getConfig() so it gets the port
+
+ Properties config = getWssConfig();
+ WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+
+ String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
+
+ TStream<String> s = t.strings(expected);
+ s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+
+ // send one, two, restart the server to force reconnect, send the next
+ AtomicInteger cnt = new AtomicInteger();
+ s = s.filter(tuple -> {
+ if (cnt.getAndIncrement() != 2)
+ return true;
+ else {
+ // delay so we rcv the prior echo'd tuple
+ try { Thread.sleep(500); } catch (Exception e) {};
+ restartEchoer(2/*secDelay*/);
+ return true;
+ }
+ });
+ wsClient.sendString(s);
+
+ TStream<String> rcvd = wsClient.receiveString();
+
+ completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
+ }
+
@Test
public void testSslNeg() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
index 82bcc4c..e2e04c9 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
@@ -6,7 +6,11 @@ package quarks.tests.connectors.wsclient.javax.websocket;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
@@ -38,6 +42,8 @@ public class WebSocketServerEcho {
String svrName = this.getClass().getSimpleName();
Server server;
ServerConnector connector;
+ URI curEndpointURI;
+ boolean curNeedClientAuth;
public static void main(String[] args) throws Exception {
URI uri = new URI("ws://localhost:0");
@@ -55,6 +61,8 @@ public class WebSocketServerEcho {
}
public void start(URI endpointURI, boolean needClientAuth) {
+ curEndpointURI = endpointURI;
+ curNeedClientAuth = needClientAuth;
System.out.println(svrName+" "+endpointURI + " needClientAuth="+needClientAuth);
@@ -117,7 +125,7 @@ public class WebSocketServerEcho {
return server;
}
else
- throw new IllegalArgumentException("unrecognized uri: "+endpointURI);
+ throw new IllegalArgumentException("unrecognized curEndpointURI: "+endpointURI);
}
private String getStorePath(String storeLeaf) {
@@ -130,8 +138,36 @@ public class WebSocketServerEcho {
}
/** restart a running server on the same port, etc: stop, delay, start */
+ private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
public void restart(int secDelay) {
- // TODO
+ // stop, schedule delay&start and return
+ URI endpointURI = setPort(curEndpointURI, getPort());
+ try {
+ System.out.println(svrName+" restart: stop "+connector);
+ connector.stop();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ System.out.println(svrName+" restart: scheduling start after "+secDelay+"sec");
+ schedExecutor.schedule(() -> {
+ System.out.println(svrName+" restart: starting...");
+ start(endpointURI, curNeedClientAuth);
+ }, secDelay, TimeUnit.SECONDS);
+ }
+
+ private URI setPort(URI endpointURI, int port) {
+ try {
+ URI uri = endpointURI;
+ if (uri.getPort() != port) {
+ uri = new URI(uri.getScheme(), uri.getUserInfo(),
+ uri.getHost(), port,
+ uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+ return uri;
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Unable to create URI", e);
+ }
}
public void stop() {
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java b/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
index 38ffee3..2da8b37 100644
--- a/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
+++ b/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
@@ -39,7 +39,11 @@ import quarks.topology.TopologyElement;
* TStream<JsonObject> r = wsclient.receive();
* r.print();
* }</pre>
- *
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages.
+ * A receiver only receives the messages of the type that it requests.
+ * <p>
* Implementations are strongly encouraged to support construction from
* Properties with the following configuration parameters:
* <ul>
@@ -93,12 +97,20 @@ public interface WebSocketClient extends TopologyElement {
/**
* Create a stream of String tuples from received WebSocket text messages.
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages. This method only receives messages sent as text.
+ *
* @return the stream
*/
TStream<String> receiveString();
/**
* Create a stream of byte[] tuples from received WebSocket binary messages.
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and
+ * binary/byte messages. This method only receives messages sent as bytes.
+ *
* @return the stream
*/
TStream<byte[]> receiveBytes();