You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2017/10/16 17:06:06 UTC
[12/23] knox git commit: KNOX-895 - Pass Headers and Cookies to
websocket backend
KNOX-895 - Pass Headers and Cookies to websocket backend
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/2d236e78
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/2d236e78
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/2d236e78
Branch: refs/heads/KNOX-998-Package_Restructuring
Commit: 2d236e78b70ef7fb312ebf0fa198657595e2f4ba
Parents: 7b401de
Author: Sandeep More <mo...@apache.org>
Authored: Wed Oct 11 17:04:52 2017 -0400
Committer: Sandeep More <mo...@apache.org>
Committed: Wed Oct 11 17:04:52 2017 -0400
----------------------------------------------------------------------
.../websockets/GatewayWebsocketHandler.java | 41 +-
.../gateway/websockets/ProxyInboundClient.java | 107 ++++++
.../websockets/ProxyWebSocketAdapter.java | 20 +-
.../websockets/ProxyInboundClientTest.java | 374 +++++++++++++++++++
4 files changed, 530 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
index 75a4a2b..0ee54fd 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -21,6 +21,8 @@ import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -40,11 +42,13 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import javax.websocket.ClientEndpointConfig;
+
/**
* Websocket handler that will handle websocket connection request. This class
* is responsible for creating a proxy socket for inbound and outbound
* connections. This is also where the http to websocket handoff happens.
- *
+ *
* @since 0.10
*/
public class GatewayWebsocketHandler extends WebSocketHandler
@@ -74,7 +78,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
/**
* Create an instance
- *
+ *
* @param config
* @param services
*/
@@ -90,7 +94,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
/*
* (non-Javadoc)
- *
+ *
* @see
* org.eclipse.jetty.websocket.server.WebSocketHandler#configure(org.eclipse.
* jetty.websocket.servlet.WebSocketServletFactory)
@@ -119,7 +123,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
/*
* (non-Javadoc)
- *
+ *
* @see
* org.eclipse.jetty.websocket.servlet.WebSocketCreator#createWebSocket(org.
* eclipse.jetty.websocket.servlet.ServletUpgradeRequest,
@@ -137,7 +141,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
final String backendURL = getMatchedBackendURL(path);
/* Upgrade happens here */
- return new ProxyWebSocketAdapter(URI.create(backendURL), pool);
+ return new ProxyWebSocketAdapter(URI.create(backendURL), pool, getClientEndpointConfig(req));
} catch (final Exception e) {
LOG.failedCreatingWebSocket(e);
throw e;
@@ -145,11 +149,32 @@ public class GatewayWebsocketHandler extends WebSocketHandler
}
/**
+ * Returns a {@link ClientEndpointConfig} config that contains the headers
+ * to be passed to the backend.
+ * @since 0.14.0
+ * @param req
+ * @return
+ */
+ private ClientEndpointConfig getClientEndpointConfig(final ServletUpgradeRequest req) {
+
+ return ClientEndpointConfig.Builder.create().configurator( new ClientEndpointConfig.Configurator() {
+
+ @Override
+ public void beforeRequest(final Map<String, List<String>> headers) {
+
+ /* Add request headers */
+ req.getHeaders().forEach(headers::putIfAbsent);
+
+ }
+ }).build();
+ }
+
+ /**
* This method looks at the context path and returns the backend websocket
* url. If websocket url is found it is used as is, or we default to
* ws://{host}:{port} which might or might not be right.
- *
- * @param The context path
+ *
+ * @param
* @return Websocket backend url
*/
private synchronized String getMatchedBackendURL(final String path) {
@@ -203,7 +228,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
URI serviceUri = new URI(backendURL);
backend.append(serviceUri);
/* Avoid Zeppelin Regression - as this would require ambari changes and break current knox websocket use case*/
- if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService[1] != null) {
+ if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService.length > 0 && pathService[1] != null) {
backend.append(pathService[1]);
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
new file mode 100644
index 0000000..4e938d2
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java
@@ -0,0 +1,107 @@
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.CloseReason;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/**
+ * A Websocket client with callback which is not annotation based.
+ * This handler accepts String and binary messages.
+ * @since 0.14.0
+ */
+public class ProxyInboundClient extends Endpoint {
+
+ /**
+ * Callback to be called once we have events on our socket.
+ */
+ private MessageEventCallback callback;
+
+ protected Session session;
+ protected EndpointConfig config;
+
+
+ public ProxyInboundClient(final MessageEventCallback callback) {
+ super();
+ this.callback = callback;
+ }
+
+ /**
+ * Developers must implement this method to be notified when a new
+ * conversation has just begun.
+ *
+ * @param backendSession the session that has just been activated.
+ * @param config the configuration used to configure this endpoint.
+ */
+ @Override
+ public void onOpen(final javax.websocket.Session backendSession, final EndpointConfig config) {
+ this.session = backendSession;
+ this.config = config;
+
+ /* Set the max message size */
+ session.setMaxBinaryMessageBufferSize(Integer.MAX_VALUE);
+ session.setMaxTextMessageBufferSize(Integer.MAX_VALUE);
+
+ /* Add message handler for binary data */
+ session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
+
+ /**
+ * Called when the message has been fully received.
+ *
+ * @param message the message data.
+ */
+ @Override
+ public void onMessage(final byte[] message) {
+ callback.onMessageBinary(message, true, session);
+ }
+
+ });
+
+ /* Add message handler for text data */
+ session.addMessageHandler(new MessageHandler.Whole<String>() {
+
+ /**
+ * Called when the message has been fully received.
+ *
+ * @param message the message data.
+ */
+ @Override
+ public void onMessage(final String message) {
+ callback.onMessageText(message, session);
+ }
+
+ });
+
+ callback.onConnectionOpen(backendSession);
+ }
+
+ @Override
+ public void onClose(final javax.websocket.Session backendSession, final CloseReason closeReason) {
+ callback.onConnectionClose(closeReason);
+ this.session = null;
+ }
+
+ @Override
+ public void onError(final javax.websocket.Session backendSession, final Throwable cause) {
+ callback.onError(cause);
+ this.session = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
index 1e7f583..4ea8d6c 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.gateway.websockets;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
@@ -60,12 +60,23 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
private ExecutorService pool;
/**
+ * Used to transmit headers from browser to backend server.
+ * @since 0.14
+ */
+ private ClientEndpointConfig clientConfig;
+
+ /**
* Create an instance
*/
public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
+ this(backend, pool, null);
+ }
+
+ public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig clientConfig) {
super();
this.backend = backend;
this.pool = pool;
+ this.clientConfig = clientConfig;
}
@Override
@@ -76,14 +87,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
* plumbing takes place
*/
container = ContainerProvider.getWebSocketContainer();
- final ProxyInboundSocket backendSocket = new ProxyInboundSocket(
- getMessageCallback());
+
+ final ProxyInboundClient backendSocket = new ProxyInboundClient(getMessageCallback());
/* build the configuration */
/* Attempt Connect */
try {
- backendSession = container.connectToServer(backendSocket, backend);
+ backendSession = container.connectToServer(backendSocket, clientConfig, backend);
+
LOG.onConnectionOpen(backend.toString());
} catch (DeploymentException e) {
http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
new file mode 100644
index 0000000..69b45dd
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java
@@ -0,0 +1,374 @@
+package org.apache.hadoop.gateway.websockets;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.DeploymentException;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * Test {@link ProxyInboundClient} class.
+ * @since 0.14.0
+ */
+public class ProxyInboundClientTest {
+
+ private static Server server;
+ private static URI serverUri;
+ private static Handler handler;
+
+ String recievedMessage = null;
+
+ byte[] recievedBinaryMessage = null;
+
+
+ /* create an instance */
+ public ProxyInboundClientTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startWSServer() throws Exception
+ {
+ server = new Server();
+ ServerConnector connector = new ServerConnector(server);
+ server.addConnector(connector);
+
+ handler = new WebsocketEchoHandler();
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(handler);
+ server.setHandler(context);
+
+ server.start();
+
+ String host = connector.getHost();
+ if (host == null)
+ {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/",host,port));
+ }
+
+ @AfterClass
+ public static void stopServer()
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.err);
+ }
+ }
+
+ //@Test(timeout = 3000)
+ @Test
+ public void testClientInstance() throws IOException, DeploymentException {
+
+ final String textMessage = "Echo";
+ final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes());
+
+ final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+ final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+ /**
+ * A generic callback, can be left un-implemented
+ *
+ * @param message
+ */
+ @Override
+ public void doCallback(String message) {
+
+ }
+
+ /**
+ * Callback when connection is established.
+ *
+ * @param session
+ */
+ @Override
+ public void onConnectionOpen(Object session) {
+
+ }
+
+ /**
+ * Callback when connection is closed.
+ *
+ * @param reason
+ */
+ @Override
+ public void onConnectionClose(CloseReason reason) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when there is an error in connection.
+ *
+ * @param cause
+ */
+ @Override
+ public void onError(Throwable cause) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a text message is received.
+ *
+ * @param message
+ * @param session
+ */
+ @Override
+ public void onMessageText(String message, Object session) {
+ recievedMessage = message;
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a binary message is received.
+ *
+ * @param message
+ * @param last
+ * @param session
+ */
+ @Override
+ public void onMessageBinary(byte[] message, boolean last,
+ Object session) {
+
+ }
+ } );
+
+ Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+ Session session = container.connectToServer(client, serverUri);
+
+ session.getBasicRemote().sendText(textMessage);
+
+ while(!isTestComplete.get()) {
+ /* just wait for the test to finish */
+ }
+
+ Assert.assertEquals("The received text message is not the same as the sent", textMessage, recievedMessage);
+ }
+
+ @Test(timeout = 3000)
+ public void testBinarymessage() throws IOException, DeploymentException {
+
+ final String textMessage = "Echo";
+ final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes());
+
+ final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+ final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+ /**
+ * A generic callback, can be left un-implemented
+ *
+ * @param message
+ */
+ @Override
+ public void doCallback(String message) {
+
+ }
+
+ /**
+ * Callback when connection is established.
+ *
+ * @param session
+ */
+ @Override
+ public void onConnectionOpen(Object session) {
+
+ }
+
+ /**
+ * Callback when connection is closed.
+ *
+ * @param reason
+ */
+ @Override
+ public void onConnectionClose(CloseReason reason) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when there is an error in connection.
+ *
+ * @param cause
+ */
+ @Override
+ public void onError(Throwable cause) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a text message is received.
+ *
+ * @param message
+ * @param session
+ */
+ @Override
+ public void onMessageText(String message, Object session) {
+ recievedMessage = message;
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a binary message is received.
+ *
+ * @param message
+ * @param last
+ * @param session
+ */
+ @Override
+ public void onMessageBinary(byte[] message, boolean last,
+ Object session) {
+ recievedBinaryMessage = message;
+ isTestComplete.set(true);
+ }
+ } );
+
+ Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+ Session session = container.connectToServer(client, serverUri);
+
+ session.getBasicRemote().sendBinary(binarymessage);
+
+ while(!isTestComplete.get()) {
+ /* just wait for the test to finish */
+ }
+
+ Assert.assertEquals("Binary message does not match", textMessage, new String(recievedBinaryMessage));
+ }
+
+ @Test(timeout = 3000)
+ public void testTextMaxBufferLimit() throws IOException, DeploymentException {
+
+ final String longMessage = RandomStringUtils.random(100000);
+
+ final AtomicBoolean isTestComplete = new AtomicBoolean(false);
+
+ final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() {
+
+ /**
+ * A generic callback, can be left un-implemented
+ *
+ * @param message
+ */
+ @Override
+ public void doCallback(String message) {
+
+ }
+
+ /**
+ * Callback when connection is established.
+ *
+ * @param session
+ */
+ @Override
+ public void onConnectionOpen(Object session) {
+
+ }
+
+ /**
+ * Callback when connection is closed.
+ *
+ * @param reason
+ */
+ @Override
+ public void onConnectionClose(CloseReason reason) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when there is an error in connection.
+ *
+ * @param cause
+ */
+ @Override
+ public void onError(Throwable cause) {
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a text message is received.
+ *
+ * @param message
+ * @param session
+ */
+ @Override
+ public void onMessageText(String message, Object session) {
+ recievedMessage = message;
+ isTestComplete.set(true);
+ }
+
+ /**
+ * Callback when a binary message is received.
+ *
+ * @param message
+ * @param last
+ * @param session
+ */
+ @Override
+ public void onMessageBinary(byte[] message, boolean last,
+ Object session) {
+
+ }
+ } );
+
+ Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class));
+
+ Session session = container.connectToServer(client, serverUri);
+
+ session.getBasicRemote().sendText(longMessage);
+
+ while(!isTestComplete.get()) {
+ /* just wait for the test to finish */
+ }
+
+ Assert.assertEquals(longMessage, recievedMessage);
+
+ }
+
+
+
+}