You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/07 19:26:20 UTC

[10/50] incubator-quarks git commit: Add client reconnect handling

Add client reconnect handling

- add tests
- enhance doc for text vs byte msgs

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/02159772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/02159772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/02159772

Branch: refs/heads/master
Commit: 021597720aa0bfcbf32ec60ba9372da230d3e8cc
Parents: a894c9c
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Mar 4 11:07:27 2016 -0500
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Mar 4 11:07:27 2016 -0500

----------------------------------------------------------------------
 .../javax/websocket/Jsr356WebSocketClient.java  |  12 +++
 .../runtime/WebSocketClientConnector.java       |  63 +++++++++---
 .../javax/websocket/WebSocketClientTest.java    | 100 +++++++++++++++++--
 .../javax/websocket/WebSocketServerEcho.java    |  40 +++++++-
 .../connectors/wsclient/WebSocketClient.java    |  14 ++-
 5 files changed, 200 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
index 8b5db91..b24ac47 100644
--- a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
+++ b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/Jsr356WebSocketClient.java
@@ -54,6 +54,10 @@ import quarks.topology.json.JsonFunctions;
  * r.print();
  * }</pre>
  * <p>
+ * Note, the WebSocket protocol differentiates between text/String and 
+ * binary/byte messages.
+ * A receiver only receives the messages of the type that it requests.
+ * <p>
  * The connector is written against the JSR356 {@code javax.websockets} API.
  * {@code javax.websockets} uses the {@link java.util.ServiceLoader} to load
  * an implementation of {@code javax.websocket.ContainerProvider}.
@@ -183,6 +187,10 @@ public class Jsr356WebSocketClient implements WebSocketClient{
     
     /**
      * Create a stream of String tuples from received WebSocket text messages.
+     * <p>
+     * Note, the WebSocket protocol differentiates between text/String and
+     * binary/byte messages.  This method only receives messages sent as text.
+     * 
      * @return the stream
      */
     public TStream<String> receiveString() {
@@ -196,6 +204,10 @@ public class Jsr356WebSocketClient implements WebSocketClient{
 
     /**
      * Create a stream of byte[] tuples from received WebSocket binary messages.
+     * <p>
+     * Note, the WebSocket protocol differentiates between text/String and
+     * binary/byte messages.  This method only receives messages sent as bytes.
+     * 
      * @return the stream
      */
     public TStream<byte[]> receiveBytes() {

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
index 6e83fec..673f628 100644
--- a/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
+++ b/connectors/wsclient-javax.websocket/src/main/java/quarks/connectors/wsclient/javax/websocket/runtime/WebSocketClientConnector.java
@@ -4,6 +4,7 @@
 */
 package quarks.connectors.wsclient.javax.websocket.runtime;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -184,30 +185,62 @@ public class WebSocketClientConnector extends Connector<Session> implements Seri
             msgReceiver.onBinaryMessage(message);
         }
     }
-
-    void sendBinary(byte[] bytes) {
-        try {
-            client().getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
-            getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
+    
+    private Session getConnectedSession() {
+        try { 
+            return client();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted", e);
         }
         catch (RuntimeException e) {
             throw e;
         }
         catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unexpected condition", e);
         }
     }
 
-    void sendText(String msg) {
-        try {
-            client().getBasicRemote().sendText(msg);
-            getLogger().trace("{} sendText {}", id(), msg);
-        }
-        catch (RuntimeException e) {
-            throw e;
+    void sendBinary(byte[] bytes) {
+        while (true) {
+            Session session = getConnectedSession();
+            try {
+                session.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));
+                getLogger().trace("{} sendBinary {} bytes.", id(), bytes.length);
+                return;
+            }
+            catch (IOException e) {
+                if (!session.isOpen()) {
+                    connectionLost(e);  // logs
+                    // retry
+                }
+                else {
+                    getLogger().error("{} sendBinary failed", id(), e);
+                    throw new RuntimeException("Unexpected condition", e);
+                }
+            }
         }
-        catch (Exception e) {
-            throw new RuntimeException(e);
+    }
+
+    void sendText(String msg) {
+        while (true) {
+            Session session = getConnectedSession();
+            try {
+                session.getBasicRemote().sendText(msg);
+                getLogger().trace("{} sendText {}", id(), msg);
+                return;
+            }
+            catch (IOException e) {
+                if (!session.isOpen()) {
+                    connectionLost(e);  // logs
+                    // retry
+                }
+                else {
+                    getLogger().error("{} sendText failed", id(), e);
+                    throw new RuntimeException("Unexpected condition", e);
+                }
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
index 1b99ddc..bf260db 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientTest.java
@@ -6,13 +6,13 @@ package quarks.tests.connectors.wsclient.javax.websocket;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assume.assumeTrue;
 
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.After;
 import org.junit.Test;
@@ -276,29 +276,72 @@ public class WebSocketClientTest extends ConnectorTestBase {
     @Test
     public void testReconnect() throws Exception {
 
-        assumeTrue(false); // TODO
-        
         Topology t = newTopology("testReconnect");
         System.out.println("===== "+t.getName());
 
         startEchoer();  // before getConfig() so it gets the port
+
+        Properties config = getConfig();
+        WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+        
+        String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
+        
+        TStream<String> s = t.strings(expected);
+        s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+        
+        // send one, two, restart the server to force reconnect, send the next
+        AtomicInteger cnt = new AtomicInteger();
+        s = s.filter(tuple -> {
+            if (cnt.getAndIncrement() != 2)
+                return true;
+            else {
+                // delay so we rcv the prior echo'd tuple
+                try { Thread.sleep(500); } catch (Exception e) {};
+                restartEchoer(2/*secDelay*/);
+                return true;
+            }
+        });
+        wsClient.sendString(s);
         
-        // TODO
-        restartEchoer(2);
+        TStream<String> rcvd = wsClient.receiveString();
+        
+        completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
     }
     
     @Test
-    public void testSslReconnect() throws Exception {
+    public void testReconnectBytes() throws Exception {
 
-        assumeTrue(false); // TODO
-        
-        Topology t = newTopology("testSslReconnect");
+        Topology t = newTopology("testReconnectBytes");
         System.out.println("===== "+t.getName());
 
         startEchoer();  // before getConfig() so it gets the port
+
+        Properties config = getConfig();
+        WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+        
+        String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
         
-        // TODO
-        restartEchoer(2);
+        TStream<byte[]> s = t.strings(expected).map(tup -> tup.getBytes());
+        s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+        
+        // send one, two, restart the server to force reconnect, send the next
+        AtomicInteger cnt = new AtomicInteger();
+        s = s.filter(tuple -> {
+            if (cnt.getAndIncrement() != 2)
+                return true;
+            else {
+                // delay so we rcv the prior echo'd tuple
+                try { Thread.sleep(500); } catch (Exception e) {};
+                restartEchoer(2/*secDelay*/);
+                return true;
+            }
+        });
+        wsClient.sendBytes(s);
+        
+        TStream<String> rcvd = wsClient.receiveBytes()
+                                .map(tup -> new String(tup));
+        
+        completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
     }
 
     private class SslSystemPropMgr {
@@ -424,6 +467,41 @@ public class WebSocketClientTest extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TMO, expected);
     }
     
+     @Test
+     public void testSslReconnect() throws Exception {
+    
+         Topology t = newTopology("testSslReconnect");
+         System.out.println("===== "+t.getName());
+    
+         startEchoer(ServerMode.SSL);  // before getConfig() so it gets the port
+    
+         Properties config = getWssConfig();
+         WebSocketClient wsClient = new Jsr356WebSocketClient(t, config);
+         
+         String[] expected = new String[] { "one", "two", "three-post-reconnect", "four" };
+         
+         TStream<String> s = t.strings(expected);
+         s = PlumbingStreams.blockingOneShotDelay(s, 2, TimeUnit.SECONDS);
+         
+         // send one, two, restart the server to force reconnect, send the next
+         AtomicInteger cnt = new AtomicInteger();
+         s = s.filter(tuple -> {
+             if (cnt.getAndIncrement() != 2)
+                 return true;
+             else {
+                 // delay so we rcv the prior echo'd tuple
+                 try { Thread.sleep(500); } catch (Exception e) {};
+                 restartEchoer(2/*secDelay*/);
+                 return true;
+             }
+         });
+         wsClient.sendString(s);
+         
+         TStream<String> rcvd = wsClient.receiveString();
+         
+         completeAndValidate("", t, rcvd, SEC_TMO + 10, expected);
+     }
+    
     @Test
     public void testSslNeg() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
index 82bcc4c..e2e04c9 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketServerEcho.java
@@ -6,7 +6,11 @@ package quarks.tests.connectors.wsclient.javax.websocket;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.websocket.CloseReason;
 import javax.websocket.OnClose;
@@ -38,6 +42,8 @@ public class WebSocketServerEcho {
     String svrName = this.getClass().getSimpleName();
     Server server;
     ServerConnector connector;
+    URI curEndpointURI;
+    boolean curNeedClientAuth;
     
     public static void main(String[] args) throws Exception {
         URI uri = new URI("ws://localhost:0");
@@ -55,6 +61,8 @@ public class WebSocketServerEcho {
     }
     
     public void start(URI endpointURI, boolean needClientAuth) {
+        curEndpointURI = endpointURI;
+        curNeedClientAuth = needClientAuth;
 
         System.out.println(svrName+" "+endpointURI + " needClientAuth="+needClientAuth);
 
@@ -117,7 +125,7 @@ public class WebSocketServerEcho {
             return server;
         }
         else
-            throw new IllegalArgumentException("unrecognized uri: "+endpointURI);
+            throw new IllegalArgumentException("unrecognized curEndpointURI: "+endpointURI);
     }
     
     private String getStorePath(String storeLeaf) {
@@ -130,8 +138,36 @@ public class WebSocketServerEcho {
     }
     
     /** restart a running server on the same port, etc: stop, delay, start */
+    private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
     public void restart(int secDelay) {
-        // TODO
+        // stop, schedule delay&start and return
+        URI endpointURI = setPort(curEndpointURI, getPort());
+        try {
+            System.out.println(svrName+" restart: stop "+connector);
+            connector.stop();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        System.out.println(svrName+" restart: scheduling start after "+secDelay+"sec");
+        schedExecutor.schedule(() -> {
+            System.out.println(svrName+" restart: starting...");
+            start(endpointURI, curNeedClientAuth);
+        }, secDelay, TimeUnit.SECONDS);
+    }
+    
+    private URI setPort(URI endpointURI, int port) {
+        try {
+            URI uri = endpointURI;
+            if (uri.getPort() != port) {
+                uri = new URI(uri.getScheme(), uri.getUserInfo(),
+                        uri.getHost(), port,
+                        uri.getPath(), uri.getQuery(), uri.getFragment());
+            }
+            return uri;
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unable to create URI", e);
+        }
     }
     
     public void stop() {

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/02159772/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java b/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
index 38ffee3..2da8b37 100644
--- a/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
+++ b/connectors/wsclient/src/main/java/quarks/connectors/wsclient/WebSocketClient.java
@@ -39,7 +39,11 @@ import quarks.topology.TopologyElement;
  * TStream<JsonObject> r = wsclient.receive();
  * r.print();
  * }</pre>
- * 
+ * <p>
+ * Note, the WebSocket protocol differentiates between text/String and 
+ * binary/byte messages.
+ * A receiver only receives the messages of the type that it requests.
+ * <p> 
  * Implementations are strongly encouraged to support construction from
  * Properties with the following configuration parameters:
  * <ul>
@@ -93,12 +97,20 @@ public interface WebSocketClient extends TopologyElement {
     
     /**
      * Create a stream of String tuples from received WebSocket text messages.
+     * <p>
+     * Note, the WebSocket protocol differentiates between text/String and
+     * binary/byte messages.  This method only receives messages sent as text.
+     * 
      * @return the stream
      */
     TStream<String> receiveString();
 
     /**
      * Create a stream of byte[] tuples from received WebSocket binary messages.
+     * <p>
+     * Note, the WebSocket protocol differentiates between text/String and
+     * binary/byte messages.  This method only receives messages sent as bytes.
+     * 
      * @return the stream
      */
     TStream<byte[]> receiveBytes();