You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2019/11/22 14:47:00 UTC

[knox] branch master updated: KNOX-2004 - Adding changes for handling Ping/Pong message from backend server on websocket connection (#200)

This is an automated email from the ASF dual-hosted git repository.

more pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new c9cb170  KNOX-2004 - Adding changes for handling Ping/Pong message from backend server on websocket connection (#200)
c9cb170 is described below

commit c9cb1702588c3e309ed0c03235718bcedb9c1ee9
Author: Rajat Goel <ra...@guavus.com>
AuthorDate: Fri Nov 22 20:16:53 2019 +0530

    KNOX-2004 - Adding changes for handling Ping/Pong message from backend server on websocket connection (#200)
    
    * KNOX-2004: Adding changes for handling Ping/Pong message from backend server on websocket connection
    
    * KNOX-2004: Adding delay before sending ping to fix test case failure on travis
---
 .../gateway/websockets/MessageEventCallback.java   |   8 ++
 .../gateway/websockets/ProxyInboundClient.java     |  16 +++
 .../gateway/websockets/ProxyWebSocketAdapter.java  |  56 ++++++--
 .../gateway/websockets/ProxyInboundClientTest.java |   9 ++
 .../knox/gateway/websockets/WebsocketClient.java   |  10 ++
 .../WebsocketServerInitiatedPingTest.java          | 151 +++++++++++++++++++++
 6 files changed, 238 insertions(+), 12 deletions(-)

diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
index 4e3e1ea..2f7fcbe 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/MessageEventCallback.java
@@ -18,6 +18,7 @@
 package org.apache.knox.gateway.websockets;
 
 import javax.websocket.CloseReason;
+import javax.websocket.PongMessage;
 
 /**
  * A simple callback interface used when evens happen on the Websocket client socket.
@@ -63,4 +64,11 @@ public interface MessageEventCallback {
    * @param session session
    */
   void onMessageBinary(byte[]  message, boolean last, Object session);
+
+  /**
+   * Callback when a pong control message is received.
+   * @param pongMessage pong message
+   * @param session session
+   */
+  void onMessagePong(PongMessage pongMessage, Object session);
 }
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
index 7cb1c00..a1797ed 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyInboundClient.java
@@ -21,6 +21,7 @@ import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
 import javax.websocket.MessageHandler;
 import javax.websocket.Session;
+import javax.websocket.PongMessage;
 
 /**
  * A Websocket client with callback which is not annotation based.
@@ -85,6 +86,21 @@ public class ProxyInboundClient extends Endpoint {
 
     });
 
+    /* Add message handler for Pong Control Message */
+    session.addMessageHandler(new MessageHandler.Whole<PongMessage>() {
+
+      /**
+       * Called when a ping message has been received.
+       *
+       * @param message the message data.
+       */
+      @Override
+      public void onMessage(final PongMessage pongMessage) {
+        callback.onMessagePong(pongMessage, session);
+      }
+
+    });
+
     callback.onConnectionOpen(backendSession);
   }
 
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
index 4274e6e..9d5015d 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
@@ -125,12 +125,8 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
     final RemoteEndpoint remote = frontEndSession.getRemote();
     try {
       if (!messageBuffer.isEmpty()) {
-        LOG.debugLog("Found old buffered messages");
-        for (String obj:messageBuffer) {
-          LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
-          remote.sendString(obj);
-        }
-        messageBuffer.clear();
+        flushBufferedMessages(remote);
+
         if (remote.getBatchMode() == BatchMode.ON) {
           remote.flush();
         }
@@ -251,12 +247,7 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
           }
 
           /* Proxy message to frontend */
-          LOG.debugLog("Found old buffered messages");
-          for (String obj:messageBuffer) {
-            LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
-            remote.sendString(obj);
-          }
-          messageBuffer.clear();
+          flushBufferedMessages(remote);
 
           LOG.debugLog("Sending current message [From Backend <---]: " + message);
           remote.sendString(message);
@@ -281,6 +272,35 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
 
       }
 
+      @Override
+      public void onMessagePong(javax.websocket.PongMessage message, Object session) {
+        LOG.logMessage("[From Backend <---]: PING");
+        remoteLock.lock();
+        final RemoteEndpoint remote = getRemote();
+        try {
+          if (remote == null) {
+            LOG.debugLog("Remote endpoint is null");
+            return;
+          }
+
+          /* Proxy Ping message to frontend */
+          flushBufferedMessages(remote);
+
+          LOG.logMessage("Sending current PING [From Backend <---]: ");
+          remote.sendPing(message.getApplicationData());
+          if (remote.getBatchMode() == BatchMode.ON) {
+            remote.flush();
+          }
+        } catch (IOException e) {
+          LOG.connectionFailed(e);
+          throw new RuntimeIOException(e);
+        }
+        finally
+        {
+          remoteLock.unlock();
+        }
+      }
+
     };
 
   }
@@ -317,4 +337,16 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
       frontendSession.close();
     }
   }
+
+  /*
+   * Function to flush buffered messages. Should be called with remoteLock held
+   */
+  private void flushBufferedMessages(final RemoteEndpoint remote) throws IOException {
+    LOG.debugLog("Flushing old buffered messages");
+    for(String obj:messageBuffer) {
+      LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+      remote.sendString(obj);
+    }
+    messageBuffer.clear();
+  }
 }
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
index dd5001f..08c4111 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ProxyInboundClientTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
 import javax.websocket.DeploymentException;
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
 import java.io.IOException;
@@ -123,6 +124,10 @@ public class ProxyInboundClientTest {
       @Override
       public void onMessageBinary(byte[] message, boolean last, Object session) {
       }
+
+      @Override
+      public void onMessagePong(PongMessage message, Object session) {
+      }
     });
 
     Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
@@ -176,6 +181,10 @@ public class ProxyInboundClientTest {
         receivedBinaryMessage = message;
         isTestComplete.set(true);
       }
+
+      @Override
+      public void onMessagePong(PongMessage message, Object session) {
+      }
     });
 
     Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
index 772be94..364f2c4 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketClient.java
@@ -18,6 +18,8 @@
 package org.apache.knox.gateway.websockets;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -30,6 +32,7 @@ import javax.websocket.OnClose;
 import javax.websocket.OnError;
 import javax.websocket.OnMessage;
 import javax.websocket.OnOpen;
+import javax.websocket.PongMessage;
 import javax.websocket.Session;
 
 import org.eclipse.jetty.util.BlockingArrayQueue;
@@ -61,6 +64,13 @@ public class WebsocketClient {
     this.messageQueue.offer(message);
   }
 
+  @OnMessage
+  public void onMessage(PongMessage message) {
+    ByteBuffer byteMessage = message.getApplicationData();
+    String s = StandardCharsets.UTF_8.decode(byteMessage).toString();
+    this.messageQueue.offer(s);
+  }
+
   @OnOpen
   public void onOpen(Session session) {
     this.session = session;
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java
new file mode 100644
index 0000000..7c6d736
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedPingTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.websockets;
+
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * A basic test that attempts to proxy websocket connections through Knox
+ * gateway.
+ * <p>
+ * The way the test is set up is as follows: <br>
+ * <ul>
+ * <li>A Mock Websocket server is setup which simply echos the responses sent by
+ * client.
+ * <li>Knox Gateway is set up with websocket handler
+ * {@link GatewayWebsocketHandler} that can proxy the requests.
+ * <li>Appropriate Topology and service definition files are set up with the
+ * address of the Websocket server.
+ * <li>A mock client is setup to connect to gateway.
+ * </ul>
+ *
+ * The test is to confirm whether the message is sent all the way to the backend
+ * Websocket server through Knox and back.
+ *
+ * @since 0.10
+ */
+public class WebsocketServerInitiatedPingTest extends WebsocketEchoTestBase {
+
+  public WebsocketServerInitiatedPingTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    handler = new WebsocketServerInitiatedPingHandler();
+    WebsocketEchoTestBase.setUpBeforeClass();
+    WebsocketEchoTestBase.startServers("ws");
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    WebsocketEchoTestBase.tearDownAfterClass();
+  }
+
+  /*
+   * Test websocket server initiated ping
+   */
+  @Test
+  public void testGatewayServerInitiatedPing() throws Exception {
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    container.connectToServer(client,
+            new URI(serverUri.toString() + "gateway/websocket/123foo456bar/channels"));
+
+    //session.getBasicRemote().sendText("Echo");
+    client.messageQueue.awaitMessages(1, 10000, TimeUnit.MILLISECONDS);
+
+    assertThat(client.messageQueue.get(0), is("PingPong"));
+  }
+
+  /**
+   * A Mock websocket handler
+   *
+   */
+  private static class WebsocketServerInitiatedPingHandler extends WebSocketHandler implements WebSocketCreator {
+    private final ServerInitiatingPingSocket socket = new ServerInitiatingPingSocket();
+
+    @Override
+    public void configure(WebSocketServletFactory factory) {
+      factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+      factory.setCreator(this);
+    }
+
+    @Override
+    public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
+      return socket;
+    }
+  }
+
+  /**
+   * A simple socket initiating message on connect
+   */
+  private static class ServerInitiatingPingSocket extends WebSocketAdapter {
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+      throw new RuntimeException(cause);
+    }
+
+    @Override
+    public void onWebSocketConnect(Session sess) {
+      super.onWebSocketConnect(sess);
+      try {
+        Thread.sleep(1000);
+      } catch (Exception e) {
+      }
+      final String textMessage = "PingPong";
+      final ByteBuffer binaryMessage = ByteBuffer.wrap(
+                 textMessage.getBytes(StandardCharsets.UTF_8));
+
+      try {
+        RemoteEndpoint remote = getRemote();
+        remote.sendPing(binaryMessage);
+        if (remote.getBatchMode() == BatchMode.ON) {
+          remote.flush();
+        }
+      } catch (IOException x) {
+        throw new RuntimeIOException(x);
+      }
+    }
+  }
+}