You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2017/10/26 14:23:02 UTC

[26/37] knox git commit: KNOX-895 - Pass Headers and Cookies to websocket backend

KNOX-895 - Pass Headers and Cookies to websocket backend


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/2d236e78
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/2d236e78
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/2d236e78

Branch: refs/heads/KNOX-1049
Commit: 2d236e78b70ef7fb312ebf0fa198657595e2f4ba
Parents: 7b401de
Author: Sandeep More <mo...@apache.org>
Authored: Wed Oct 11 17:04:52 2017 -0400
Committer: Sandeep More <mo...@apache.org>
Committed: Wed Oct 11 17:04:52 2017 -0400

----------------------------------------------------------------------
 .../websockets/GatewayWebsocketHandler.java     |  41 +-
 .../gateway/websockets/ProxyInboundClient.java  | 107 ++++++
 .../websockets/ProxyWebSocketAdapter.java       |  20 +-
 .../websockets/ProxyInboundClientTest.java      | 374 +++++++++++++++++++
 4 files changed, 530 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
index 75a4a2b..0ee54fd 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,11 +42,13 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 
+import javax.websocket.ClientEndpointConfig;
+
 /**
  * Websocket handler that will handle websocket connection request. This class
  * is responsible for creating a proxy socket for inbound and outbound
  * connections. This is also where the http to websocket handoff happens.
- * 
+ *
  * @since 0.10
  */
 public class GatewayWebsocketHandler extends WebSocketHandler
@@ -74,7 +78,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
 
   /**
    * Create an instance
-   * 
+   *
    * @param config
    * @param services
    */
@@ -90,7 +94,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see
    * org.eclipse.jetty.websocket.server.WebSocketHandler#configure(org.eclipse.
    * jetty.websocket.servlet.WebSocketServletFactory)
@@ -119,7 +123,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see
    * org.eclipse.jetty.websocket.servlet.WebSocketCreator#createWebSocket(org.
    * eclipse.jetty.websocket.servlet.ServletUpgradeRequest,
@@ -137,7 +141,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
       final String backendURL = getMatchedBackendURL(path);
 
       /* Upgrade happens here */
-      return new ProxyWebSocketAdapter(URI.create(backendURL), pool);
+      return new ProxyWebSocketAdapter(URI.create(backendURL), pool, getClientEndpointConfig(req));
     } catch (final Exception e) {
       LOG.failedCreatingWebSocket(e);
       throw e;
@@ -145,11 +149,32 @@ public class GatewayWebsocketHandler extends WebSocketHandler
   }
 
   /**
+   * Returns a {@link ClientEndpointConfig} config that contains the headers
+   * to be passed to the backend.
+   * @since 0.14.0
+   * @param req
+   * @return
+   */
+  private ClientEndpointConfig getClientEndpointConfig(final ServletUpgradeRequest req) {
+
+    return ClientEndpointConfig.Builder.create().configurator( new ClientEndpointConfig.Configurator() {
+
+       @Override
+       public void beforeRequest(final Map<String, List<String>> headers) {
+
+         /* Add request headers */
+         req.getHeaders().forEach(headers::putIfAbsent);
+
+       }
+    }).build();
+  }
+
+  /**
    * This method looks at the context path and returns the backend websocket
    * url. If websocket url is found it is used as is, or we default to
    * ws://{host}:{port} which might or might not be right.
-   * 
-   * @param  The context path
+   *
+   * @param
    * @return Websocket backend url
    */
   private synchronized String getMatchedBackendURL(final String path) {
@@ -203,7 +228,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
         URI serviceUri = new URI(backendURL);
         backend.append(serviceUri);
         /* Avoid Zeppelin Regression - as this would require ambari changes and break current knox websocket use case*/
-        if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService[1] != null) {
+        if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService.length > 0 && pathService[1] != null) {
           backend.append(pathService[1]);
         }
       }

http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
new file mode 100644
index 0000000..4e938d2
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
@@ -0,0 +1,107 @@
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.CloseReason;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/**
+ * A Websocket client with callback which is not annotation based.
+ * This handler accepts String and binary messages.
+ * @since 0.14.0
+ */
+public class ProxyInboundClient extends Endpoint {
+
+  /**
+   * Callback to be called once we have events on our socket.
+   */
+  private MessageEventCallback callback;
+
+  protected Session session;
+  protected EndpointConfig config;
+
+
+  public ProxyInboundClient(final MessageEventCallback callback) {
+    super();
+    this.callback = callback;
+  }
+
+  /**
+   * Developers must implement this method to be notified when a new
+   * conversation has just begun.
+   *
+   * @param backendSession the session that has just been activated.
+   * @param config  the configuration used to configure this endpoint.
+   */
+  @Override
+  public void onOpen(final javax.websocket.Session backendSession, final EndpointConfig config) {
+    this.session = backendSession;
+    this.config = config;
+
+    /* Set the max message size */
+    session.setMaxBinaryMessageBufferSize(Integer.MAX_VALUE);
+    session.setMaxTextMessageBufferSize(Integer.MAX_VALUE);
+
+    /* Add message handler for binary data */
+    session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
+
+      /**
+       * Called when the message has been fully received.
+       *
+       * @param message the message data.
+       */
+      @Override
+      public void onMessage(final byte[] message) {
+        callback.onMessageBinary(message, true, session);
+      }
+
+    });
+
+    /* Add message handler for text data */
+    session.addMessageHandler(new MessageHandler.Whole<String>() {
+
+      /**
+       * Called when the message has been fully received.
+       *
+       * @param message the message data.
+       */
+      @Override
+      public void onMessage(final String message) {
+        callback.onMessageText(message, session);
+      }
+
+    });
+
+    callback.onConnectionOpen(backendSession);
+  }
+
+  @Override
+  public void onClose(final javax.websocket.Session backendSession, final CloseReason closeReason) {
+    callback.onConnectionClose(closeReason);
+    this.session = null;
+  }
+
+  @Override
+  public void onError(final javax.websocket.Session backendSession, final Throwable cause) {
+    callback.onError(cause);
+    this.session = null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
index 1e7f583..4ea8d6c 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.gateway.websockets;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
+import javax.websocket.ClientEndpointConfig;
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
 import javax.websocket.DeploymentException;
@@ -60,12 +60,23 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
   private ExecutorService pool;
 
   /**
+   * Used to transmit headers from browser to backend server.
+   * @since 0.14
+   */
+  private ClientEndpointConfig clientConfig;
+
+  /**
    * Create an instance
    */
   public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
+    this(backend, pool, null);
+  }
+
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig clientConfig) {
     super();
     this.backend = backend;
     this.pool = pool;
+    this.clientConfig = clientConfig;
   }
 
   @Override
@@ -76,14 +87,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
      * plumbing takes place
      */
     container = ContainerProvider.getWebSocketContainer();
-    final ProxyInboundSocket backendSocket = new ProxyInboundSocket(
-        getMessageCallback());
+
+    final ProxyInboundClient backendSocket = new ProxyInboundClient(getMessageCallback());
 
     /* build the configuration */
 
     /* Attempt Connect */
     try {
-      backendSession = container.connectToServer(backendSocket, backend);
+      backendSession = container.connectToServer(backendSocket, clientConfig, backend);
+
       LOG.onConnectionOpen(backend.toString());
 
     } catch (DeploymentException e) {

http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
new file mode 100644
index 0000000..69b45dd
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
@@ -0,0 +1,374 @@
+package org.apache.hadoop.gateway.websockets;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.DeploymentException;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * Test {@link ProxyInboundClient} class.
+ * @since 0.14.0
+ */
+public class ProxyInboundClientTest {
+
+  private static Server server;
+  private static URI serverUri;
+  private static Handler handler;
+
+  String recievedMessage = null;
+
+  byte[] recievedBinaryMessage = null;
+
+
+  /* create an instance */
+  public ProxyInboundClientTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void startWSServer() throws Exception
+  {
+    server = new Server();
+    ServerConnector connector = new ServerConnector(server);
+    server.addConnector(connector);
+
+    handler = new WebsocketEchoHandler();
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(handler);
+    server.setHandler(context);
+
+    server.start();
+
+    String host = connector.getHost();
+    if (host == null)
+    {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    serverUri = new URI(String.format("ws://%s:%d/",host,port));
+  }
+
+  @AfterClass
+  public static void stopServer()
+  {
+    try
+    {
+      server.stop();
+    }
+    catch (Exception e)
+    {
+      e.printStackTrace(System.err);
+    }
+  }
+
+  //@Test(timeout = 3000)
+  @Test
+  public void testClientInstance() throws IOException, DeploymentException {
+
+    final String textMessage = "Echo";
+    final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes());
+
+    final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+    final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+    final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+      /**
+       * A generic callback, can be left un-implemented
+       *
+       * @param message
+       */
+      @Override
+      public void doCallback(String message) {
+
+      }
+
+      /**
+       * Callback when connection is established.
+       *
+       * @param session
+       */
+      @Override
+      public void onConnectionOpen(Object session) {
+
+      }
+
+      /**
+       * Callback when connection is closed.
+       *
+       * @param reason
+       */
+      @Override
+      public void onConnectionClose(CloseReason reason) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when there is an error in connection.
+       *
+       * @param cause
+       */
+      @Override
+      public void onError(Throwable cause) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a text message is received.
+       *
+       * @param message
+       * @param session
+       */
+      @Override
+      public void onMessageText(String message, Object session) {
+        recievedMessage = message;
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a binary message is received.
+       *
+       * @param message
+       * @param last
+       * @param session
+       */
+      @Override
+      public void onMessageBinary(byte[] message, boolean last,
+          Object session) {
+
+      }
+    } );
+
+    Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+    Session session = container.connectToServer(client, serverUri);
+
+    session.getBasicRemote().sendText(textMessage);
+
+    while(!isTestComplete.get()) {
+      /* just wait for the test to finish */
+    }
+
+    Assert.assertEquals("The received text message is not the same as the sent", textMessage, recievedMessage);
+  }
+
+  @Test(timeout = 3000)
+  public void testBinarymessage() throws IOException, DeploymentException {
+
+    final String textMessage = "Echo";
+    final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes());
+
+    final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+    final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+    final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+      /**
+       * A generic callback, can be left un-implemented
+       *
+       * @param message
+       */
+      @Override
+      public void doCallback(String message) {
+
+      }
+
+      /**
+       * Callback when connection is established.
+       *
+       * @param session
+       */
+      @Override
+      public void onConnectionOpen(Object session) {
+
+      }
+
+      /**
+       * Callback when connection is closed.
+       *
+       * @param reason
+       */
+      @Override
+      public void onConnectionClose(CloseReason reason) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when there is an error in connection.
+       *
+       * @param cause
+       */
+      @Override
+      public void onError(Throwable cause) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a text message is received.
+       *
+       * @param message
+       * @param session
+       */
+      @Override
+      public void onMessageText(String message, Object session) {
+        recievedMessage = message;
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a binary message is received.
+       *
+       * @param message
+       * @param last
+       * @param session
+       */
+      @Override
+      public void onMessageBinary(byte[] message, boolean last,
+          Object session) {
+        recievedBinaryMessage = message;
+        isTestComplete.set(true);
+      }
+    } );
+
+    Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+    Session session = container.connectToServer(client, serverUri);
+
+    session.getBasicRemote().sendBinary(binarymessage);
+
+    while(!isTestComplete.get()) {
+      /* just wait for the test to finish */
+    }
+
+    Assert.assertEquals("Binary message does not match", textMessage, new String(recievedBinaryMessage));
+  }
+
+  @Test(timeout = 3000)
+  public void testTextMaxBufferLimit() throws IOException, DeploymentException {
+
+    final String longMessage = RandomStringUtils.random(100000);
+
+    final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+    final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+    final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+      /**
+       * A generic callback, can be left un-implemented
+       *
+       * @param message
+       */
+      @Override
+      public void doCallback(String message) {
+
+      }
+
+      /**
+       * Callback when connection is established.
+       *
+       * @param session
+       */
+      @Override
+      public void onConnectionOpen(Object session) {
+
+      }
+
+      /**
+       * Callback when connection is closed.
+       *
+       * @param reason
+       */
+      @Override
+      public void onConnectionClose(CloseReason reason) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when there is an error in connection.
+       *
+       * @param cause
+       */
+      @Override
+      public void onError(Throwable cause) {
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a text message is received.
+       *
+       * @param message
+       * @param session
+       */
+      @Override
+      public void onMessageText(String message, Object session) {
+        recievedMessage = message;
+        isTestComplete.set(true);
+      }
+
+      /**
+       * Callback when a binary message is received.
+       *
+       * @param message
+       * @param last
+       * @param session
+       */
+      @Override
+      public void onMessageBinary(byte[] message, boolean last,
+          Object session) {
+
+      }
+    } );
+
+    Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+    Session session = container.connectToServer(client, serverUri);
+
+    session.getBasicRemote().sendText(longMessage);
+
+    while(!isTestComplete.get()) {
+      /* just wait for the test to finish */
+    }
+
+    Assert.assertEquals(longMessage, recievedMessage);
+
+  }
+
+
+
+}