You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by su...@apache.org on 2016/10/31 18:02:17 UTC
[1/2] knox git commit: KNOX-752 Implementation of initial websocket
support (Sandeep More via Sumit Gupta)
Repository: knox
Updated Branches:
refs/heads/master 9bcca2581 -> c6caebd4b
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketMultipleConnectionTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketMultipleConnectionTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketMultipleConnectionTest.java
new file mode 100644
index 0000000..a8e8d46
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketMultipleConnectionTest.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
+import org.apache.hadoop.gateway.deploy.DeploymentFactory;
+import org.apache.hadoop.gateway.services.DefaultGatewayServices;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.mycila.xmltool.XMLDoc;
+import com.mycila.xmltool.XMLTag;
+
+/**
+ * Test how Knox holds up under multiple concurrent connections.
+ *
+ */
+public class WebsocketMultipleConnectionTest {
+ /**
+ * Simulate backend websocket
+ */
+ private static Server backendServer;
+ /**
+ * URI for backend websocket server
+ */
+ private static URI backendServerUri;
+
+ /**
+ * Mock Gateway server
+ */
+ private static Server gatewayServer;
+
+ /**
+ * Mock gateway config
+ */
+ private static GatewayConfig gatewayConfig;
+
+ private static GatewayServices services;
+
+ /**
+ * URI for gateway server
+ */
+ private static URI serverUri;
+
+ private static File topoDir;
+
+ /**
+ * Maximum number of open connections to test.
+ */
+ private static int MAX_CONNECTIONS = 100;
+
+ public WebsocketMultipleConnectionTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+
+ startWebsocketServer();
+ startGatewayServer();
+
+ }
+
+ @AfterClass
+ public static void stopServers() {
+ try {
+ gatewayServer.stop();
+ backendServer.stop();
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ /* Cleanup the created files */
+ FileUtils.deleteQuietly(topoDir);
+
+ }
+
+ /**
+ * Test websocket proxying through gateway.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleConnections() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ final CountDownLatch latch = new CountDownLatch(MAX_CONNECTIONS);
+
+ Session[] sessions = new Session[MAX_CONNECTIONS];
+
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ System.gc();
+ final long heapt1 = memoryMXBean.getHeapMemoryUsage().getUsed();
+ final long nonHeapt1 = memoryMXBean.getNonHeapMemoryUsage().getUsed();
+
+ for (int i = 0; i < MAX_CONNECTIONS; i++) {
+
+ sessions[i] = container.connectToServer(new WebsocketClient() {
+
+ @Override
+ public void onMessage(String message) {
+ latch.countDown();
+
+ }
+
+ }, new URI(serverUri.toString() + "gateway/websocket/ws"));
+
+ }
+
+ for (int i = 0; i < MAX_CONNECTIONS; i++) {
+
+ sessions[i].getBasicRemote().sendText("OK");
+ }
+
+ latch.await(5 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
+
+ System.gc();
+
+ final long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed() - heapt1;
+ final long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed()
+ - nonHeapt1;
+
+ System.out.println("heapUsed = " + heapUsed);
+ System.out.println("nonHeapUsed = " + nonHeapUsed);
+
+ /* 90 KB per connection */
+ /*
+ long expected = 90 * 1024 * MAX_CONNECTIONS;
+ assertThat("heap used", heapUsed, lessThan(expected));
+ */
+ }
+
+ /**
+ * Start Mock Websocket server that acts as backend.
+ *
+ * @throws Exception
+ */
+ private static void startWebsocketServer() throws Exception {
+
+ backendServer = new Server(new QueuedThreadPool(254));
+ ServerConnector connector = new ServerConnector(backendServer);
+ backendServer.addConnector(connector);
+
+ final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(handler);
+ backendServer.setHandler(context);
+
+ // Start Server
+ backendServer.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ backendServerUri = new URI(String.format("ws://%s:%d/ws", host, port));
+
+ }
+
+ /**
+ * Start Gateway Server.
+ *
+ * @throws Exception
+ */
+ private static void startGatewayServer() throws Exception {
+ /* use default Max threads */
+ gatewayServer = new Server(new QueuedThreadPool(254));
+ final ServerConnector connector = new ServerConnector(gatewayServer);
+ gatewayServer.addConnector(connector);
+
+ /* workaround so we can add our handler later at runtime */
+ HandlerCollection handlers = new HandlerCollection(true);
+
+ /* add some initial handlers */
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ handlers.addHandler(context);
+
+ gatewayServer.setHandler(handlers);
+
+ // Start Server
+ gatewayServer.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ /* Setup websocket handler */
+ setupGatewayConfig(backendServerUri.toString());
+
+ final GatewayWebsocketHandler gatewayWebsocketHandler = new GatewayWebsocketHandler(
+ gatewayConfig, services);
+ handlers.addHandler(gatewayWebsocketHandler);
+ gatewayWebsocketHandler.start();
+ }
+
+ /**
+ * Initialize the configs and components required for this test.
+ *
+ * @param backend
+ * @throws IOException
+ */
+ private static void setupGatewayConfig(final String backend)
+ throws IOException {
+ services = new DefaultGatewayServices();
+
+ topoDir = createDir();
+ URL serviceUrl = ClassLoader.getSystemResource("websocket-services");
+
+ final File descriptor = new File(topoDir, "websocket.xml");
+ final FileOutputStream stream = new FileOutputStream(descriptor);
+ createKnoxTopology(backend).toStream(stream);
+ stream.close();
+
+ final TestTopologyListener topoListener = new TestTopologyListener();
+
+ final Map<String, String> options = new HashMap<String, String>();
+ options.put("persist-master", "false");
+ options.put("master", "password");
+
+ gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
+ EasyMock.expect(gatewayConfig.getGatewayTopologyDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewayServicesDir())
+ .andReturn(serviceUrl.getFile()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getEphemeralDHKeySize()).andReturn("2048")
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewaySecurityDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ /* Websocket configs */
+ EasyMock.expect(gatewayConfig.isWebsocketEnabled()).andReturn(true)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketInputBufferSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketAsyncWriteTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+
+ EasyMock.replay(gatewayConfig);
+
+ try {
+ services.init(gatewayConfig, options);
+ } catch (ServiceLifecycleException e) {
+ e.printStackTrace();
+ }
+
+ DeploymentFactory.setGatewayServices(services);
+ final TopologyService monitor = services
+ .getService(GatewayServices.TOPOLOGY_SERVICE);
+ monitor.addTopologyChangeListener(topoListener);
+ monitor.reloadTopologies();
+
+ }
+
+ private static File createDir() throws IOException {
+ return TestUtils
+ .createTempDir(WebsocketEchoTest.class.getSimpleName() + "-");
+ }
+
+ private static XMLTag createKnoxTopology(final String backend) {
+ XMLTag xml = XMLDoc.newDocument(true).addRoot("topology").addTag("service")
+ .addTag("role").addText("WEBSOCKET").addTag("url").addText(backend)
+ .gotoParent().gotoRoot();
+ // System.out.println( "GATEWAY=" + xml.toString() );
+ return xml;
+ }
+
+ private static class TestTopologyListener implements TopologyListener {
+
+ public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+ @Override
+ public void handleTopologyEvent(List<TopologyEvent> events) {
+ this.events.add(events);
+
+ synchronized (this) {
+ for (TopologyEvent event : events) {
+ if (!event.getType().equals(TopologyEvent.Type.DELETED)) {
+
+ /* for this test we only care about this part */
+ DeploymentFactory.createDeployment(gatewayConfig,
+ event.getTopology());
+
+ }
+ }
+
+ }
+
+ }
+
+ }
+
+ private static abstract class WebsocketClient extends Endpoint
+ implements MessageHandler.Whole<String> {
+ @Override
+ public void onOpen(Session session, EndpointConfig config) {
+ session.addMessageHandler(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/rewrite.xml
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/rewrite.xml b/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/rewrite.xml
new file mode 100644
index 0000000..bbc77c5
--- /dev/null
+++ b/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/rewrite.xml
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<rules>
+
+ <rule dir="IN" name="WEBSOCKET/ws/inbound" pattern="*://*:*/**/ws">
+ <rewrite template="{$serviceUrl[WEBSOCKET]}/ws"/>
+ </rule>
+
+ <rule dir="IN" name="WEBSOCKET/inbound" pattern="*://*:*/**/ws{**}">
+ <rewrite template="{$serviceUrl[WEBSOCKET]}/ws{**}"/>
+ </rule>
+</rules>
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/service.xml
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/service.xml b/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/service.xml
new file mode 100644
index 0000000..a4abfe5
--- /dev/null
+++ b/gateway-server/src/test/resources/websocket-services/websocket/0.6.0/service.xml
@@ -0,0 +1,33 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<service role="WEBSOCKET" name="websocket" version="0.6.0">
+ <policies>
+ <policy role="webappsec"/>
+ <policy role="authentication" name="Anonymous"/>
+ <policy role="rewrite"/>
+ <policy role="authorization"/>
+ </policies>
+ <routes>
+ <route path="/ws">
+ <rewrite apply="WEBSOCKET/ws/inbound" to="request.url"/>
+ </route>
+
+ <route path="/ws**">
+ <rewrite apply="WEBSOCKET/inbound" to="request.url"/>
+ </route>
+ </routes>
+</service>
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
index 85da3b5..3e538fd 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/config/GatewayConfig.java
@@ -145,4 +145,61 @@ public interface GatewayConfig {
List<String> getGlobalRulesServices();
+ /**
+ * Returns true if websocket feature enabled else false.
+ * Default is false.
+ * @since 0.10
+ * @return
+ */
+ boolean isWebsocketEnabled();
+
+ /**
+ * Websocket connection max text message size.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketMaxTextMessageSize();
+
+ /**
+ * Websocket connection max binary message size.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketMaxBinaryMessageSize();
+
+ /**
+ * Websocket connection max text message buffer size.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketMaxTextMessageBufferSize();
+
+ /**
+ * Websocket connection max binary message buffer size.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketMaxBinaryMessageBufferSize();
+
+ /**
+ * Websocket connection input buffer size.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketInputBufferSize();
+
+ /**
+ * Websocket connection async write timeout.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketAsyncWriteTimeout();
+
+ /**
+ * Websocket connection idle timeout.
+ * @since 0.10
+ * @return
+ */
+ int getWebsocketIdleTimeout();
+
}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-test-release/webhdfs-kerb-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
----------------------------------------------------------------------
diff --git a/gateway-test-release/webhdfs-kerb-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java b/gateway-test-release/webhdfs-kerb-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
index 74643e8..695ad02 100644
--- a/gateway-test-release/webhdfs-kerb-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
+++ b/gateway-test-release/webhdfs-kerb-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
@@ -28,6 +28,16 @@ import java.util.List;
public class GatewayTestConfig extends Configuration implements GatewayConfig {
+ /* Websocket defaults */
+ public static final boolean DEFAULT_WEBSOCKET_FEATURE_ENABLED = false;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
+ public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+
private String gatewayHomeDir = "gateway-home";
private String hadoopConfDir = "hadoop";
private String gatewayHost = "localhost";
@@ -350,4 +360,68 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig {
public List<String> getGlobalRulesServices() {
return Collections.EMPTY_LIST;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#isWebsocketEnabled()
+ */
+ @Override
+ public boolean isWebsocketEnabled() {
+ return DEFAULT_WEBSOCKET_FEATURE_ENABLED;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxTextMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageSize() {
+ return DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxBinaryMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageSize() {
+ return DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxTextMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageBufferSize() {
+ return DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxBinaryMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageBufferSize() {
+ return DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketInputBufferSize()
+ */
+ @Override
+ public int getWebsocketInputBufferSize() {
+ return DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketAsyncWriteTimeout()
+ */
+ @Override
+ public int getWebsocketAsyncWriteTimeout() {
+ return DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketIdleTimeout()
+ */
+ @Override
+ public int getWebsocketIdleTimeout() {
+ return DEFAULT_WEBSOCKET_IDLE_TIMEOUT;
+ }
}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
index b0b78f9..11b6eb5 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/GatewayTestConfig.java
@@ -27,6 +27,16 @@ import java.util.List;
public class GatewayTestConfig extends Configuration implements GatewayConfig {
+ /* Websocket defaults */
+ public static final boolean DEFAULT_WEBSOCKET_FEATURE_ENABLED = false;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
+ public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+
private String gatewayHomeDir = "gateway-home";
private String hadoopConfDir = "hadoop";
private String gatewayHost = "localhost";
@@ -408,4 +418,68 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig {
services.add("STORM");
return services;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#isWebsocketEnabled()
+ */
+ @Override
+ public boolean isWebsocketEnabled() {
+ return DEFAULT_WEBSOCKET_FEATURE_ENABLED;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxTextMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageSize() {
+ return DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxBinaryMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageSize() {
+ return DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxTextMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageBufferSize() {
+ return DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketMaxBinaryMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageBufferSize() {
+ return DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketInputBufferSize()
+ */
+ @Override
+ public int getWebsocketInputBufferSize() {
+ return DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketAsyncWriteTimeout()
+ */
+ @Override
+ public int getWebsocketAsyncWriteTimeout() {
+ return DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#getWebsocketIdleTimeout()
+ */
+ @Override
+ public int getWebsocketIdleTimeout() {
+ return DEFAULT_WEBSOCKET_IDLE_TIMEOUT;
+ }
}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bfb0fc1..bcbff43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
<surefire-version>2.16</surefire-version>
<failsafe-version>2.19.1</failsafe-version>
<apacheds-version>2.0.0-M16</apacheds-version>
+ <javax-websocket-version>1.1</javax-websocket-version>
</properties>
<licenses>
@@ -1120,6 +1121,37 @@
<version>${jetty-version}</version>
</dependency>
+ <!-- Websocket support -->
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-server</artifactId>
+ <version>${jetty-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-servlet</artifactId>
+ <version>${jetty-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.websocket</groupId>
+ <artifactId>javax.websocket-api</artifactId>
+ <version>${javax-websocket-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>javax-websocket-server-impl</artifactId>
+ <version>${jetty-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>javax-websocket-client-impl</artifactId>
+ <version>${jetty-version}</version>
+ </dependency>
+
<!-- ********** ********** ********** ********** ********** ********** -->
<!-- ********** Test Dependencies ********** -->
<!-- ********** ********** ********** ********** ********** ********** -->
[2/2] knox git commit: KNOX-752 Implementation of initial websocket
support (Sandeep More via Sumit Gupta)
Posted by su...@apache.org.
KNOX-752 Implementation of initial websocket support (Sandeep More via Sumit Gupta)
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/c6caebd4
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/c6caebd4
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/c6caebd4
Branch: refs/heads/master
Commit: c6caebd4be9b6f1d3882d4e5938007fbaad55678
Parents: 9bcca25
Author: Sumit Gupta <su...@apache.org>
Authored: Mon Oct 31 14:00:43 2016 -0400
Committer: Sumit Gupta <su...@apache.org>
Committed: Mon Oct 31 14:00:43 2016 -0400
----------------------------------------------------------------------
gateway-release/home/conf/gateway-site.xml | 7 +
gateway-server/pom.xml | 28 ++
.../apache/hadoop/gateway/GatewayServer.java | 29 +-
.../gateway/config/impl/GatewayConfigImpl.java | 88 ++++-
.../websockets/GatewayWebsocketHandler.java | 217 +++++++++++
.../websockets/MessageEventCallback.java | 66 ++++
.../gateway/websockets/ProxyInboundSocket.java | 82 ++++
.../websockets/ProxyWebSocketAdapter.java | 243 ++++++++++++
.../websockets/WebsocketLogMessages.java | 60 +++
.../gateway/websockets/BadBackendTest.java | 117 ++++++
.../hadoop/gateway/websockets/BadUrlTest.java | 307 +++++++++++++++
.../websockets/ConnectionDroppedTest.java | 202 ++++++++++
.../hadoop/gateway/websockets/EchoSocket.java | 68 ++++
.../gateway/websockets/MessageFailureTest.java | 206 ++++++++++
.../gateway/websockets/WebsocketClient.java | 131 +++++++
.../websockets/WebsocketEchoHandler.java | 47 +++
.../gateway/websockets/WebsocketEchoTest.java | 368 ++++++++++++++++++
.../WebsocketMultipleConnectionTest.java | 388 +++++++++++++++++++
.../websocket/0.6.0/rewrite.xml | 26 ++
.../websocket/0.6.0/service.xml | 33 ++
.../hadoop/gateway/config/GatewayConfig.java | 57 +++
.../hadoop/gateway/GatewayTestConfig.java | 74 ++++
.../hadoop/gateway/GatewayTestConfig.java | 74 ++++
pom.xml | 32 ++
24 files changed, 2942 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-release/home/conf/gateway-site.xml
----------------------------------------------------------------------
diff --git a/gateway-release/home/conf/gateway-site.xml b/gateway-release/home/conf/gateway-site.xml
index d9ba16c..80cfacf 100644
--- a/gateway-release/home/conf/gateway-site.xml
+++ b/gateway-release/home/conf/gateway-site.xml
@@ -60,4 +60,11 @@ limitations under the License.
<description>Boolean flag indicating whether to enable debug messages for krb5 authentication</description>
</property>
+ <!-- @since 0.10 Websocket configs -->
+ <property>
+ <name>gateway.websocket.feature.enabled</name>
+ <value>false</value>
+ <description>Enable/Disable websocket feature.</description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/pom.xml
----------------------------------------------------------------------
diff --git a/gateway-server/pom.xml b/gateway-server/pom.xml
index 7501aaf..a9c5b70 100644
--- a/gateway-server/pom.xml
+++ b/gateway-server/pom.xml
@@ -213,6 +213,34 @@
<artifactId>apache-jstl</artifactId>
</dependency>
+ <!-- Websocket support -->
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-servlet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.websocket</groupId>
+ <artifactId>javax.websocket-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>javax-websocket-server-impl</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>javax-websocket-client-impl</artifactId>
+ </dependency>
+
+
<!-- ********** ********** ********** ********** ********** ********** -->
<!-- ********** Test Dependencies ********** -->
<!-- ********** ********** ********** ********** ********** ********** -->
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
index 15baa56..abaa168 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.gateway.trace.ErrorHandler;
import org.apache.hadoop.gateway.trace.TraceHandler;
import org.apache.hadoop.gateway.util.Urls;
import org.apache.hadoop.gateway.util.XmlUtils;
+import org.apache.hadoop.gateway.websockets.GatewayWebsocketHandler;
import org.apache.log4j.PropertyConfigurator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
@@ -361,13 +362,27 @@ public class GatewayServer {
DefaultTopologyHandler defaultTopoHandler = new DefaultTopologyHandler(
config, services, contexts);
- /*
- * Chaining the gzipHandler to correlationHandler. The expected flow here is
- * defaultTopoHandler -> logHandler -> gzipHandler -> correlationHandler ->
- * traceHandler
- */
- handlers.setHandlers(
- new Handler[] { defaultTopoHandler, logHandler, gzipHandler });
+ if (config.isWebsocketEnabled()) {
+ GatewayWebsocketHandler websockethandler = new GatewayWebsocketHandler(
+ config, services);
+ websockethandler.setHandler(gzipHandler);
+
+ /*
+ * Chaining the gzipHandler to correlationHandler. The expected flow here
+ * is defaultTopoHandler -> logHandler -> gzipHandler ->
+ * correlationHandler -> traceHandler -> websockethandler
+ */
+ handlers.setHandlers(
+ new Handler[] { defaultTopoHandler, logHandler, websockethandler });
+ } else {
+ /*
+ * Chaining the gzipHandler to correlationHandler. The expected flow here
+ * is defaultTopoHandler -> logHandler -> gzipHandler ->
+ * correlationHandler -> traceHandler
+ */
+ handlers.setHandlers(
+ new Handler[] { defaultTopoHandler, logHandler, gzipHandler });
+ }
return handlers;
}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
index 0bfe82f..d8349d8 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
@@ -45,7 +45,7 @@ import java.util.Map;
*
* 1. gateway-default.xml - All the configuration variables that the
* Gateway needs. These are the defaults that ship with the app
- * and should only be changed be the app developers.
+ * and should only be changed by the app developers.
*
* 2. gateway-site.xml - The (possibly empty) configuration that the
* system administrator can set variables for their Hadoop cluster.
@@ -131,6 +131,16 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
public static final String DEPLOYMENTS_BACKUP_VERSION_LIMIT = GATEWAY_CONFIG_FILE_PREFIX + ".deployment.backup.versionLimit";
public static final String DEPLOYMENTS_BACKUP_AGE_LIMIT = GATEWAY_CONFIG_FILE_PREFIX + ".deployment.backup.ageLimit";
+ /* @since 0.10 Websocket config variables */
+ public static final String WEBSOCKET_FEATURE_ENABLED = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.feature.enabled";
+ public static final String WEBSOCKET_MAX_TEXT_MESSAGE_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.text.size";
+ public static final String WEBSOCKET_MAX_BINARY_MESSAGE_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.binary.size";
+ public static final String WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.text.buffer.size";
+ public static final String WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.binary.buffer.size";
+ public static final String WEBSOCKET_INPUT_BUFFER_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.input.buffer.size";
+ public static final String WEBSOCKET_ASYNC_WRITE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.async.write.timeout";
+ public static final String WEBSOCKET_IDLE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.idle.timeout";
+
// These config property names are not inline with the convention of using the
// GATEWAY_CONFIG_FILE_PREFIX as is done by those above. These are left for
// backward compatibility.
@@ -146,6 +156,17 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
public static final String DEFAULT_DEPLOYMENT_DIR = "deployments";
public static final String DEFAULT_SECURITY_DIR = "security";
public static final String DEFAULT_DATA_DIR = "data";
+
+ /* Websocket defaults */
+ public static final boolean DEFAULT_WEBSOCKET_FEATURE_ENABLED = false;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE = Integer.MAX_VALUE;;
+ public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 32768;
+ public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
+ public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+
private static List<String> DEFAULT_GLOBAL_RULES_SERVICES;
@@ -630,6 +651,71 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
return DEFAULT_GLOBAL_RULES_SERVICES;
}
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#isWebsocketEnabled()
+ */
+ @Override
+ public boolean isWebsocketEnabled() {
+ final String result = get( WEBSOCKET_FEATURE_ENABLED, Boolean.toString(DEFAULT_WEBSOCKET_FEATURE_ENABLED));
+ return Boolean.parseBoolean(result);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxTextMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageSize() {
+ return getInt( WEBSOCKET_MAX_TEXT_MESSAGE_SIZE, DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxBinaryMessageSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageSize() {
+ return getInt( WEBSOCKET_MAX_BINARY_MESSAGE_SIZE, DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxTextMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxTextMessageBufferSize() {
+ return getInt( WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE, DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxBinaryMessageBufferSize()
+ */
+ @Override
+ public int getWebsocketMaxBinaryMessageBufferSize() {
+ return getInt( WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE, DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketInputBufferSize()
+ */
+ @Override
+ public int getWebsocketInputBufferSize() {
+ return getInt( WEBSOCKET_INPUT_BUFFER_SIZE, DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketAsyncWriteTimeout()
+ */
+ @Override
+ public int getWebsocketAsyncWriteTimeout() {
+ return getInt( WEBSOCKET_ASYNC_WRITE_TIMEOUT, DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketIdleTimeout()
+ */
+ @Override
+ public int getWebsocketIdleTimeout() {
+ return getInt( WEBSOCKET_IDLE_TIMEOUT, DEFAULT_WEBSOCKET_IDLE_TIMEOUT);
+ }
+
private static long parseNetworkTimeout( String s ) {
PeriodFormatter f = new PeriodFormatterBuilder()
.appendMinutes().appendSuffix("m"," min")
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
new file mode 100644
index 0000000..fbae2f9
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.service.definition.ServiceDefinition;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.registry.ServiceDefEntry;
+import org.apache.hadoop.gateway.services.registry.ServiceDefinitionRegistry;
+import org.apache.hadoop.gateway.services.registry.ServiceRegistry;
+import org.apache.hadoop.gateway.util.ServiceDefinitionsLoader;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Websocket handler that will handle websocket connection request. This class
+ * is responsible for creating a proxy socket for inbound and outbound
+ * connections. This is also where the http to websocket handoff happens.
+ *
+ * @since 0.10
+ */
+public class GatewayWebsocketHandler extends WebSocketHandler
+ implements WebSocketCreator {
+
+ private static WebsocketLogMessages LOG = MessagesFactory
+ .get(WebsocketLogMessages.class);
+
+ public final static String WEBSOCKET_PROTOCOL_STRING = "ws://";
+
+ final static String REGEX_SPLIT_CLUSTER_NAME = "^((?:[^/]*/){1}[^/]*)";
+
+ final static String REGEX_SPLIT_CONTEXT = "^((?:[^/]*/){2}[^/]*)";
+
+ final GatewayConfig config;
+ final GatewayServices services;
+
+ /**
+ * Create an instance
+ *
+ * @param config
+ * @param services
+ */
+ public GatewayWebsocketHandler(final GatewayConfig config,
+ final GatewayServices services) {
+ super();
+
+ this.config = config;
+ this.services = services;
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.eclipse.jetty.websocket.server.WebSocketHandler#configure(org.eclipse.
+ * jetty.websocket.servlet.WebSocketServletFactory)
+ */
+ @Override
+ public void configure(final WebSocketServletFactory factory) {
+ factory.setCreator(this);
+ factory.getPolicy()
+ .setMaxTextMessageSize(config.getWebsocketMaxTextMessageSize());
+ factory.getPolicy()
+ .setMaxBinaryMessageSize(config.getWebsocketMaxBinaryMessageSize());
+
+ factory.getPolicy().setMaxBinaryMessageBufferSize(
+ config.getWebsocketMaxBinaryMessageBufferSize());
+ factory.getPolicy().setMaxTextMessageBufferSize(
+ config.getWebsocketMaxTextMessageBufferSize());
+
+ factory.getPolicy()
+ .setInputBufferSize(config.getWebsocketInputBufferSize());
+
+ factory.getPolicy()
+ .setAsyncWriteTimeout(config.getWebsocketAsyncWriteTimeout());
+ factory.getPolicy().setIdleTimeout(config.getWebsocketIdleTimeout());
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.eclipse.jetty.websocket.servlet.WebSocketCreator#createWebSocket(org.
+ * eclipse.jetty.websocket.servlet.ServletUpgradeRequest,
+ * org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse)
+ */
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req,
+ ServletUpgradeResponse resp) {
+
+ try {
+ final URI requestURI = req.getRequestURI();
+ final String path = requestURI.getPath();
+
+ /* URL used to connect to websocket backend */
+ final String backendURL = getMatchedBackendURL(path);
+
+ /* Upgrade happens here */
+ return new ProxyWebSocketAdapter(URI.create(backendURL));
+ } catch (final Exception e) {
+ LOG.failedCreatingWebSocket(e);
+ throw e;
+ }
+ }
+
+ /**
+ * This method looks at the context path and returns the backend websocket
+ * url. If websocket url is found it is used as is, or we default to
+ * ws://{host}:{port} which might or might not be right.
+ *
+ * @param The
+ * context path
+ * @return Websocket backend url
+ */
+ private synchronized String getMatchedBackendURL(final String path) {
+
+ final ServiceRegistry serviceRegistryService = services
+ .getService(GatewayServices.SERVICE_REGISTRY_SERVICE);
+
+ final ServiceDefinitionRegistry serviceDefinitionService = services
+ .getService(GatewayServices.SERVICE_DEFINITION_REGISTRY);
+
+ /* Filter out the /cluster/topology to get the context we want */
+ String[] pathInfo = path.split(REGEX_SPLIT_CONTEXT);
+
+ final ServiceDefEntry entry = serviceDefinitionService
+ .getMatchingService(pathInfo[1]);
+
+ if (entry == null) {
+ throw new RuntimeException(
+ String.format("Cannot find service for the given path: %s", path));
+ }
+
+ final File servicesDir = new File(config.getGatewayServicesDir());
+
+ final Set<ServiceDefinition> serviceDefs = ServiceDefinitionsLoader
+ .getServiceDefinitions(servicesDir);
+
+ /* URL used to connect to websocket backend */
+ String backendURL = urlFromServiceDefinition(serviceDefs,
+ serviceRegistryService, entry, path);
+
+ try {
+
+ /* if we do not find websocket URL we default to HTTP */
+ if (!StringUtils.contains(backendURL, WEBSOCKET_PROTOCOL_STRING)) {
+ URL serviceUrl = new URL(backendURL);
+
+ final StringBuffer backend = new StringBuffer();
+ /* Use http host:port if ws url not configured */
+ final String protocol = (serviceUrl.getProtocol() == "ws"
+ || serviceUrl.getProtocol() == "wss") ? serviceUrl.getProtocol()
+ : "ws";
+ backend.append(protocol).append("://");
+ backend.append(serviceUrl.getHost()).append(":");
+ backend.append(serviceUrl.getPort()).append("/");
+ ;
+ backend.append(serviceUrl.getPath());
+
+ backendURL = backend.toString();
+ }
+
+ } catch (MalformedURLException e) {
+ LOG.badUrlError(e);
+ throw new RuntimeException(e.toString());
+ }
+
+ return backendURL;
+ }
+
+ private static String urlFromServiceDefinition(
+ final Set<ServiceDefinition> serviceDefs,
+ final ServiceRegistry serviceRegistry, final ServiceDefEntry entry,
+ final String path) {
+
+ final String[] contexts = path.split("/");
+
+ final String serviceURL = serviceRegistry.lookupServiceURL(contexts[2],
+ entry.getName().toUpperCase());
+
+ /*
+ * we have a match, if ws:// is present it is returend else http:// is
+ * returned
+ */
+ return serviceURL;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
new file mode 100644
index 0000000..66cd069
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.CloseReason;
+
+/**
+ * A simple callback interface used when evens happen on the Websocket client socket.
+ *
+ */
+public interface MessageEventCallback {
+
+ /**
+ * A generic callback, can be left un-implemented
+ */
+ void doCallback(final String message);
+
+ /**
+ * Callback when connection is established.
+ * @param session
+ */
+ void onConnectionOpen(final Object session);
+
+ /**
+ * Callback when connection is closed.
+ * @param reason
+ */
+ void onConnectionClose(final CloseReason reason);
+
+ /**
+ * Callback when there is an error in connection.
+ * @param cause
+ */
+ void onError(final Throwable cause);
+
+ /**
+ * Callback when a text message is received.
+ * @param message
+ * @param session
+ */
+ void onMessageText(final String message, final Object session);
+
+ /**
+ * Callback when a binary message is received.
+ * @param message
+ * @param last
+ * @param session
+ */
+ void onMessageBinary(final byte[] message, final boolean last, final Object session);
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
new file mode 100644
index 0000000..39744e1
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+
+/**
+ * A Websocket client with callback.
+ * @since 0.10
+ */
+@ClientEndpoint
+public class ProxyInboundSocket {
+
+ /**
+ * Callback to be called once we have events on our socket.
+ */
+ final MessageEventCallback callback;
+
+ /**
+ * Create an instance
+ */
+ public ProxyInboundSocket(final MessageEventCallback callback) {
+ super();
+ this.callback = callback;
+ }
+
+ /* Client methods */
+ @OnOpen
+ public void onClientOpen(final javax.websocket.Session backendSession) {
+
+ callback.onConnectionOpen(backendSession);
+
+ }
+
+ @OnClose
+ public void onClientClose(final CloseReason reason) {
+ callback.onConnectionClose(reason);
+ }
+
+ @OnError
+ public void onClientError(Throwable cause) {
+ cause.printStackTrace(System.err);
+ callback.onError(cause);
+ }
+
+ @OnMessage
+ public void onBackendMessage(final String message,
+ final javax.websocket.Session session) {
+
+ callback.onMessageText(message, session);
+
+ }
+
+ @OnMessage
+ public void onBackendMessageBinary(final byte[] message, final boolean last,
+ final javax.websocket.Session session) {
+
+ callback.onMessageBinary(message, last, session);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
new file mode 100644
index 0000000..c7a7649
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.DeploymentException;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+/**
+ * Handles outbound/inbound Websocket connections and sessions.
+ *
+ * @since 0.10
+ */
+public class ProxyWebSocketAdapter extends WebSocketAdapter {
+
+ private static WebsocketLogMessages LOG = MessagesFactory
+ .get(WebsocketLogMessages.class);
+
+ /* URI for the backend */
+ private final URI backend;
+
+ /* Session between the frontend (browser) and Knox */
+ private Session frontendSession;
+
+ /* Session between the backend (outbound) and Knox */
+ private javax.websocket.Session backendSession;
+
+ private WebSocketContainer container;
+
+ /**
+ * Create an instance
+ */
+ public ProxyWebSocketAdapter(URI backend) {
+ super();
+ this.backend = backend;
+ }
+
+ @Override
+ public void onWebSocketConnect(final Session frontEndSession) {
+
+ /*
+ * Let's connect to the backend, this is where the Backend-to-frontend
+ * plumbing takes place
+ */
+ container = ContainerProvider.getWebSocketContainer();
+ final ProxyInboundSocket backendSocket = new ProxyInboundSocket(
+ getMessageCallback());
+
+ /* build the configuration */
+
+ /* Attempt Connect */
+ try {
+ backendSession = container.connectToServer(backendSocket, backend);
+ LOG.onConnectionOpen(backend.toString());
+
+ } catch (DeploymentException e) {
+ LOG.connectionFailed(e);
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ throw new RuntimeIOException(e);
+ }
+
+ super.onWebSocketConnect(frontEndSession);
+ this.frontendSession = frontEndSession;
+
+ }
+
+ @Override
+ public void onWebSocketBinary(final byte[] payload, final int offset,
+ final int length) {
+
+ if (isNotConnected()) {
+ return;
+ }
+
+ throw new UnsupportedOperationException(
+ "Websocket support for binary messages is not supported at this time.");
+ }
+
+ @Override
+ public void onWebSocketText(final String message) {
+
+ if (isNotConnected()) {
+ return;
+ }
+
+ LOG.logMessage("[From Frontend --->]" + message);
+
+ /* Proxy message to backend */
+ try {
+ backendSession.getBasicRemote().sendText(message);
+
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ }
+
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ super.onWebSocketClose(statusCode, reason);
+
+ closeQuietly();
+
+ LOG.onConnectionClose(backend.toString());
+
+ }
+
+ @Override
+ public void onWebSocketError(final Throwable t) {
+ passErrorToInboundConnection(t);
+ }
+
+ /**
+ * A helper function to pass errors to Inbound connection (browser/client)
+ */
+ private void passErrorToInboundConnection(final Throwable t) {
+
+ LOG.onError(t.toString());
+ if (t.toString().contains("exceeds maximum size")) {
+ frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+ }
+
+ else {
+ frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
+ closeQuietly();
+ throw new RuntimeException(t);
+ }
+ }
+
+ private MessageEventCallback getMessageCallback() {
+
+ return new MessageEventCallback() {
+
+ @Override
+ public void doCallback(String message) {
+ /* do nothing */
+
+ }
+
+ @Override
+ public void onConnectionOpen(Object session) {
+ /* do nothing */
+
+ }
+
+ @Override
+ public void onConnectionClose(final CloseReason reason) {
+ try {
+ frontendSession.close(reason.getCloseCode().getCode(),
+ reason.getReasonPhrase());
+ } finally {
+ closeQuietly();
+ }
+
+ }
+
+ @Override
+ public void onError(Throwable cause) {
+ passErrorToInboundConnection(cause);
+ }
+
+ @Override
+ public void onMessageText(String message, Object session) {
+ final RemoteEndpoint remote = getRemote();
+
+ LOG.logMessage("[From Backend <---]" + message);
+
+ /* Proxy message to frontend */
+ try {
+ remote.sendString(message);
+ if (remote.getBatchMode() == BatchMode.ON) {
+ remote.flush();
+ }
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ throw new RuntimeIOException(e);
+ }
+
+ }
+
+ @Override
+ public void onMessageBinary(byte[] message, boolean last,
+ Object session) {
+ throw new UnsupportedOperationException(
+ "Websocket support for binary messages is not supported at this time.");
+
+ }
+
+ };
+
+ }
+
+ private void closeQuietly() {
+
+ try {
+ backendSession.close();
+ } catch (IOException e) {
+ LOG.connectionFailed(e);
+ }
+
+ if (container instanceof LifeCycle) {
+ try {
+ ((LifeCycle) container).stop();
+ } catch (Exception e) {
+ LOG.connectionFailed(e);
+ }
+ }
+
+ frontendSession.close();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
new file mode 100644
index 0000000..55997d5
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import org.apache.hadoop.gateway.i18n.messages.Message;
+import org.apache.hadoop.gateway.i18n.messages.MessageLevel;
+import org.apache.hadoop.gateway.i18n.messages.Messages;
+import org.apache.hadoop.gateway.i18n.messages.StackTrace;
+
+/**
+ * Logging for Websocket
+ *
+ * @since 0.10
+ */
+
+@Messages(logger = "org.apache.hadoop.gateway.websockets")
+public interface WebsocketLogMessages {
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Error creating websocket connection: {0}")
+ void failedCreatingWebSocket(
+ @StackTrace(level = MessageLevel.ERROR) Exception e);
+
+ @Message(level = MessageLevel.ERROR,
+ text = "Unable to connect to websocket server: {0}")
+ void connectionFailed(@StackTrace(level = MessageLevel.ERROR) Exception e);
+
+ @Message(level = MessageLevel.ERROR, text = "Error: {0}")
+ void onError(final String message);
+
+ @Message(level = MessageLevel.ERROR, text = "Bad or malformed url: {0}")
+ void badUrlError(@StackTrace(level = MessageLevel.ERROR) Exception e);
+
+ @Message(level = MessageLevel.DEBUG,
+ text = "Websocket connection to backend server {0} opened")
+ void onConnectionOpen(final String backend);
+
+ @Message(level = MessageLevel.DEBUG, text = "Message: {0}")
+ void logMessage(final String message);
+
+ @Message(level = MessageLevel.DEBUG,
+ text = "Websocket connection to backend server {0} closed")
+ void onConnectionClose(final String backend);
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
new file mode 100644
index 0000000..1e4b86f
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.websockets.BigEchoSocketHandler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test a case where the backend is down.
+ *
+ */
+public class BadBackendTest {
+
+ /* Proxy */
+ private static Server proxy;
+ private static ServerConnector proxyConnector;
+ private static URI proxyUri;
+
+ private static final String BAD_BACKEND = "ws://localhost:666";
+
+ public BadBackendTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServer() throws Exception {
+ startProxy();
+
+ }
+
+ @AfterClass
+ public static void stopServer() throws Exception {
+ proxy.stop();
+
+ }
+
+ /**
+ * Test for a message within limit.
+ * @throws IOException
+ * @throws Exception
+ */
+ @Test(timeout = 8000)
+ public void testBadBackEnd() throws IOException, Exception {
+ final String message = "Echo";
+
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ javax.websocket.Session session = container.connectToServer(client,
+ proxyUri);
+ session.getBasicRemote().sendText(message);
+
+ client.awaitClose(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode(), 1000,
+ TimeUnit.MILLISECONDS);
+
+ Assert.assertThat(client.close.getCloseCode().getCode(), CoreMatchers.is(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode()));
+
+ }
+
+ private static void startProxy() throws Exception {
+ proxy = new Server();
+ proxyConnector = new ServerConnector(proxy);
+ proxy.addConnector(proxyConnector);
+
+ /* start Knox with WebsocketAdapter to test */
+ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+ new ProxyWebSocketAdapter(new URI(BAD_BACKEND)));
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(wsHandler);
+ proxy.setHandler(context);
+
+ // Start Server
+ proxy.start();
+
+ String host = proxyConnector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = proxyConnector.getLocalPort();
+ proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
new file mode 100644
index 0000000..4b9f836
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
+import org.apache.hadoop.gateway.deploy.DeploymentFactory;
+import org.apache.hadoop.gateway.services.DefaultGatewayServices;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.mycila.xmltool.XMLDoc;
+import com.mycila.xmltool.XMLTag;
+
+/**
+ * Test for bad URLs.
+ * <p>
+ * This test will set up a bad URL through the topology, so this test case will
+ * attempt to test the bad url case and also the plumbing around it.
+ * @since 0.10
+ */
+public class BadUrlTest {
+
+ /**
+ * Non-existant backend websocket server
+ */
+ private static String BACKEND = "http://localhost:9999";
+
+ /**
+ * Mock Gateway server
+ */
+ private static Server gatewayServer;
+
+ /**
+ * Mock gateway config
+ */
+ private static GatewayConfig gatewayConfig;
+
+ private static GatewayServices services;
+
+ /**
+ * URI for gateway server
+ */
+ private static URI serverUri;
+
+ private static File topoDir;
+
+ public BadUrlTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+
+ startGatewayServer();
+
+ }
+
+ @AfterClass
+ public static void stopServers() {
+ try {
+ gatewayServer.stop();
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ /* Cleanup the created files */
+ FileUtils.deleteQuietly(topoDir);
+
+ }
+
+ /**
+ * Test websocket proxying through gateway.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBadUrl() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+
+ container.connectToServer(client,
+ new URI(serverUri.toString() + "gateway/websocket/ws"));
+
+ client.awaitClose(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode(),
+ 1000, TimeUnit.MILLISECONDS);
+
+ Assert.assertThat(client.close.getCloseCode().getCode(),
+ CoreMatchers.is(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode()));
+
+ }
+
+ /**
+ * Start Gateway Server.
+ *
+ * @throws Exception
+ */
+ private static void startGatewayServer() throws Exception {
+ gatewayServer = new Server();
+ final ServerConnector connector = new ServerConnector(gatewayServer);
+ gatewayServer.addConnector(connector);
+
+ /* workaround so we can add our handler later at runtime */
+ HandlerCollection handlers = new HandlerCollection(true);
+
+ /* add some initial handlers */
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ handlers.addHandler(context);
+
+ gatewayServer.setHandler(handlers);
+
+ // Start Server
+ gatewayServer.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ /* Setup websocket handler */
+ setupGatewayConfig(BACKEND);
+
+ final GatewayWebsocketHandler gatewayWebsocketHandler = new GatewayWebsocketHandler(
+ gatewayConfig, services);
+ handlers.addHandler(gatewayWebsocketHandler);
+ gatewayWebsocketHandler.start();
+ }
+
+ /**
+ * Initialize the configs and components required for this test.
+ *
+ * @param backend
+ * @throws IOException
+ */
+ private static void setupGatewayConfig(final String backend)
+ throws IOException {
+ services = new DefaultGatewayServices();
+
+ topoDir = createDir();
+ URL serviceUrl = ClassLoader.getSystemResource("websocket-services");
+
+ final File descriptor = new File(topoDir, "websocket.xml");
+ final FileOutputStream stream = new FileOutputStream(descriptor);
+ createKnoxTopology(backend).toStream(stream);
+ stream.close();
+
+ final TestTopologyListener topoListener = new TestTopologyListener();
+
+ final Map<String, String> options = new HashMap<String, String>();
+ options.put("persist-master", "false");
+ options.put("master", "password");
+
+ gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
+ EasyMock.expect(gatewayConfig.getGatewayTopologyDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewayServicesDir())
+ .andReturn(serviceUrl.getFile()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getEphemeralDHKeySize()).andReturn("2048")
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewaySecurityDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ /* Websocket configs */
+ EasyMock.expect(gatewayConfig.isWebsocketEnabled()).andReturn(true)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketInputBufferSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketAsyncWriteTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+
+ EasyMock.replay(gatewayConfig);
+
+ try {
+ services.init(gatewayConfig, options);
+ } catch (ServiceLifecycleException e) {
+ e.printStackTrace();
+ }
+
+ DeploymentFactory.setGatewayServices(services);
+ final TopologyService monitor = services
+ .getService(GatewayServices.TOPOLOGY_SERVICE);
+ monitor.addTopologyChangeListener(topoListener);
+ monitor.reloadTopologies();
+
+ }
+
+ private static File createDir() throws IOException {
+ return TestUtils
+ .createTempDir(WebsocketEchoTest.class.getSimpleName() + "-");
+ }
+
+ /**
+ * Intentionally add bad URL
+ *
+ * @param backend
+ * @return
+ */
+ private static XMLTag createKnoxTopology(final String backend) {
+ XMLTag xml = XMLDoc.newDocument(true).addRoot("topology").addTag("service")
+ .addTag("role").addText("WEBSOCKET").addTag("url").addText(backend)
+ .gotoParent().gotoRoot();
+ // System.out.println( "GATEWAY=" + xml.toString() );
+ return xml;
+ }
+
+ private static class TestTopologyListener implements TopologyListener {
+
+ public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+ @Override
+ public void handleTopologyEvent(List<TopologyEvent> events) {
+ this.events.add(events);
+
+ synchronized (this) {
+ for (TopologyEvent event : events) {
+ if (!event.getType().equals(TopologyEvent.Type.DELETED)) {
+
+ /* for this test we only care about this part */
+ DeploymentFactory.createDeployment(gatewayConfig,
+ event.getTopology());
+
+ }
+ }
+
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
new file mode 100644
index 0000000..6562e5c
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.websockets.BigEchoSocketHandler;
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test to simulate unexpected connection drop. Here we establish a connection
+ * and then try to simulate an error.
+ *
+ * @since 0.10
+ *
+ */
+public class ConnectionDroppedTest {
+
+ private static Server backend;
+ private static ServerConnector connector;
+ private static URI serverUri;
+
+ /* Proxy */
+ private static Server proxy;
+ private static ServerConnector proxyConnector;
+ private static URI proxyUri;
+
+ public ConnectionDroppedTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServer() throws Exception {
+ startBackend();
+ startProxy();
+
+ }
+
+ @AfterClass
+ public static void stopServer() throws Exception {
+ /* ORDER MATTERS ! */
+ proxy.stop();
+ backend.stop();
+
+ }
+
+ /**
+ * The connection is dropped, so we should see a tineout exception when we try
+ * to poll the message from queue.
+ *
+ * @throws IOException
+ * @throws Exception
+ */
+ @Test(expected = java.util.concurrent.TimeoutException.class)
+ public void testDroppedConnection() throws IOException, Exception {
+ final String message = "Echo";
+
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ javax.websocket.Session session = container.connectToServer(client,
+ proxyUri);
+
+ session.getBasicRemote().sendText(message);
+
+ client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+ }
+
+ private static void startBackend() throws Exception {
+ backend = new Server();
+ connector = new ServerConnector(backend);
+ backend.addConnector(connector);
+
+ /* start backend with Echo socket */
+ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+ new BadSocket());
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(wsHandler);
+ backend.setHandler(context);
+
+ // Start Server
+ backend.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ }
+
+ private static void startProxy() throws Exception {
+ proxy = new Server();
+ proxyConnector = new ServerConnector(proxy);
+ proxy.addConnector(proxyConnector);
+
+ /* start Knox with WebsocketAdapter to test */
+ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+ new ProxyWebSocketAdapter(serverUri));
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(wsHandler);
+ proxy.setHandler(context);
+
+ // Start Server
+ proxy.start();
+
+ String host = proxyConnector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = proxyConnector.getLocalPort();
+ proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ }
+
+}
+
+/**
+ *
+ * Simulate a bad socket.
+ *
+ * @since 0.10
+ */
+class BadSocket extends WebSocketAdapter {
+
+ private Session session;
+
+ @Override
+ public void onWebSocketConnect(final Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {
+ if (isNotConnected())
+ return;
+
+ try {
+ RemoteEndpoint remote = getRemote();
+ remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
+ if (remote.getBatchMode() == BatchMode.ON)
+ remote.flush();
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ if (isNotConnected())
+ return;
+ // Throw an exception on purpose
+ throw new RuntimeException("Simulating bad connection ...");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
new file mode 100644
index 0000000..e46b5e2
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+/**
+ *
+ * A simple Echo socket
+ */
+public class EchoSocket extends WebSocketAdapter {
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {
+ if (isNotConnected())
+ return;
+
+ try {
+ RemoteEndpoint remote = getRemote();
+ remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
+ if (remote.getBatchMode() == BatchMode.ON)
+ remote.flush();
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ if (isNotConnected())
+ return;
+
+ try {
+ RemoteEndpoint remote = getRemote();
+ remote.sendString(message, null);
+ if (remote.getBatchMode() == BatchMode.ON)
+ remote.flush();
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
new file mode 100644
index 0000000..f98b7e1
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Dummy Test for max message size.
+ *
+ */
+public class MessageFailureTest {
+
+ private static Server backend;
+ private static ServerConnector connector;
+ private static URI serverUri;
+
+ /* Proxy */
+ private static Server proxy;
+ private static ServerConnector proxyConnector;
+ private static URI proxyUri;
+
+ public MessageFailureTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServer() throws Exception {
+ startBackend();
+ startProxy();
+
+ }
+
+ @AfterClass
+ public static void stopServer() throws Exception {
+ /* ORDER MATTERS ! */
+ proxy.stop();
+ backend.stop();
+
+ }
+
+
+ /**
+ * Test for a message that bigger than configured value
+ * @throws IOException
+ * @throws Exception
+ */
+ @Test(timeout = 8000)
+ public void testMessageTooBig() throws IOException, Exception {
+ final String bigMessage = "Echooooooooooooo";
+
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ javax.websocket.Session session = container.connectToServer(client,
+ proxyUri);
+ session.getBasicRemote().sendText(bigMessage);
+
+ client.awaitClose(CloseReason.CloseCodes.TOO_BIG.getCode(), 1000,
+ TimeUnit.MILLISECONDS);
+
+ Assert.assertThat(client.close.getCloseCode().getCode(), CoreMatchers.is(CloseReason.CloseCodes.TOO_BIG.getCode()));
+
+ }
+
+
+ /**
+ * Test for a message within limit.
+ * @throws IOException
+ * @throws Exception
+ */
+ @Test(timeout = 8000)
+ public void testMessageOk() throws IOException, Exception {
+ final String message = "Echo";
+
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ javax.websocket.Session session = container.connectToServer(client,
+ proxyUri);
+ session.getBasicRemote().sendText(message);
+
+ client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+ Assert.assertThat(client.messageQueue.get(0), CoreMatchers.is("Echo"));
+
+ }
+
+
+ private static void startBackend() throws Exception {
+ backend = new Server();
+ connector = new ServerConnector(backend);
+ backend.addConnector(connector);
+
+ /* start backend with Echo socket */
+ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+ new EchoSocket());
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(wsHandler);
+ backend.setHandler(context);
+
+ // Start Server
+ backend.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ }
+
+ private static void startProxy() throws Exception {
+ proxy = new Server();
+ proxyConnector = new ServerConnector(proxy);
+ proxy.addConnector(proxyConnector);
+
+ /* start Knox with WebsocketAdapter to test */
+ final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+ new ProxyWebSocketAdapter(serverUri));
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(wsHandler);
+ proxy.setHandler(context);
+
+ // Start Server
+ proxy.start();
+
+ String host = proxyConnector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = proxyConnector.getLocalPort();
+ proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ }
+
+}
+
+/**
+ * A Mock websocket handler that just Echos messages
+ *
+ */
+class BigEchoSocketHandler extends WebSocketHandler
+ implements WebSocketCreator {
+
+ // final EchoSocket socket = new EchoSocket();
+ final WebSocketAdapter socket;
+
+ public BigEchoSocketHandler(final WebSocketAdapter socket) {
+ this.socket = socket;
+ }
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.getPolicy().setMaxTextMessageSize(10);
+ factory.getPolicy().setMaxBinaryMessageSize(10);
+ factory.setCreator(this);
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req,
+ ServletUpgradeResponse resp) {
+ return socket;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
new file mode 100644
index 0000000..92045f3
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+/**
+ * A Test websocket client
+ *
+ */
+@ClientEndpoint
+public class WebsocketClient {
+
+ private Session session = null;
+ public CloseReason close = null;
+ public MessageQueue messageQueue = new MessageQueue();
+ public LinkedList<Throwable> errors = new LinkedList<>();
+ public CountDownLatch closeLatch = new CountDownLatch(1);
+
+ public boolean onError = false;
+ public String errorMessage = null;
+
+ @OnClose
+ public void onWebSocketClose(CloseReason reason) {
+ this.close = reason;
+ closeLatch.countDown();
+ }
+
+ @OnMessage
+ public void onMessage(String message) {
+ this.messageQueue.offer(message);
+ }
+
+ @OnOpen
+ public void onOpen(Session session) {
+ this.session = session;
+ }
+
+ @OnError
+ public void onError(Session session, Throwable thr) {
+ errors.add(thr);
+ this.onError = true;
+ this.errorMessage = thr.toString();
+ }
+
+ public void sendText(String text) throws IOException {
+ if (session != null) {
+ session.getBasicRemote().sendText(text);
+ }
+ }
+
+ /**
+ * Check whether we have expected close code
+ *
+ * @param expectedCloseCode
+ * @param timeoutDuration
+ * @param timeoutUnit
+ * @throws TimeoutException
+ */
+ public void awaitClose(int expectedCloseCode, int timeoutDuration,
+ TimeUnit timeoutUnit) throws TimeoutException {
+
+ long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration, timeoutUnit);
+ long now = System.currentTimeMillis();
+ long expireOn = now + msDur;
+
+ while (close == null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (InterruptedException ignore) {
+ /* ignore */
+ }
+ if ((System.currentTimeMillis() > expireOn)) {
+ throw new TimeoutException("Timed out reading message from queue");
+ }
+
+ }
+
+ }
+
+ public class MessageQueue extends BlockingArrayQueue<String> {
+
+ public void awaitMessages(int expectedMessageCount, int timeoutDuration,
+ TimeUnit timeoutUnit) throws TimeoutException {
+ long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration, timeoutUnit);
+ long now = System.currentTimeMillis();
+ long expireOn = now + msDur;
+
+ while (this.size() < expectedMessageCount) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(20);
+ } catch (InterruptedException ignore) {
+ /* ignore */
+ }
+ if ((System.currentTimeMillis() > expireOn)) {
+ throw new TimeoutException("Timed out reading message from queue");
+ }
+
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
new file mode 100644
index 0000000..dc5f2f1
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * A Mock websocket handler that just Echos messages
+ *
+ */
+public class WebsocketEchoHandler extends WebSocketHandler implements WebSocketCreator {
+
+ final EchoSocket socket = new EchoSocket();
+
+ @Override
+ public void configure(WebSocketServletFactory factory)
+ {
+ factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+ factory.setCreator(this);
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
+ {
+ return socket;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
new file mode 100644
index 0000000..d6d60a4
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
+import org.apache.hadoop.gateway.deploy.DeploymentFactory;
+import org.apache.hadoop.gateway.services.DefaultGatewayServices;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.mycila.xmltool.XMLDoc;
+import com.mycila.xmltool.XMLTag;
+
+/**
+ * A basic test that attempts to proxy websocket connections through Knox
+ * gateway.
+ * <p>
+ * The way the test is set up is as follows: <br/>
+ * <ul>
+ * <li>A Mock Websocket server is setup which simply echos the responses sent by
+ * client.
+ * <li>Knox Gateway is set up with websocket handler
+ * {@link GatewayWebsocketHandler} that can proxy the requests.
+ * <li>Appropriate Topology and service definition files are set up with the
+ * address of the Websocket server.
+ * <li>A mock client is setup to connect to gateway.
+ * </ul>
+ *
+ * The test is to confirm whether the message is sent all the way to the backend
+ * Websocket server through Knox and back.
+ *
+ *
+ * @since 0.10
+ */
+public class WebsocketEchoTest {
+
+ /**
+ * Simulate backend websocket
+ */
+ private static Server backendServer;
+ /**
+ * URI for backend websocket server
+ */
+ private static URI backendServerUri;
+
+ /**
+ * Mock Gateway server
+ */
+ private static Server gatewayServer;
+
+ /**
+ * Mock gateway config
+ */
+ private static GatewayConfig gatewayConfig;
+
+ private static GatewayServices services;
+
+ /**
+ * URI for gateway server
+ */
+ private static URI serverUri;
+
+ private static File topoDir;
+
+ public WebsocketEchoTest() {
+ super();
+ }
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+
+ startWebsocketServer();
+ startGatewayServer();
+
+ }
+
+ @AfterClass
+ public static void stopServers() {
+ try {
+ gatewayServer.stop();
+ backendServer.stop();
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ }
+
+ /* Cleanup the created files */
+ FileUtils.deleteQuietly(topoDir);
+
+ }
+
+ /**
+ * Test direct connection to websocket server without gateway
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDirectEcho() throws Exception {
+
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ WebsocketClient client = new WebsocketClient();
+
+ Session session = container.connectToServer(client, backendServerUri);
+
+ session.getBasicRemote().sendText("Echo");
+ client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+ }
+
+ /**
+ * Test websocket proxying through gateway.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGatewayEcho() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+ WebsocketClient client = new WebsocketClient();
+ Session session = container.connectToServer(client,
+ new URI(serverUri.toString() + "gateway/websocket/ws"));
+
+ session.getBasicRemote().sendText("Echo");
+ client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+ assertThat(client.messageQueue.get(0), is("Echo"));
+
+ }
+
+ /**
+ * Start Mock Websocket server that acts as backend.
+ *
+ * @throws Exception
+ */
+ private static void startWebsocketServer() throws Exception {
+
+ backendServer = new Server();
+ ServerConnector connector = new ServerConnector(backendServer);
+ backendServer.addConnector(connector);
+
+ final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ context.setHandler(handler);
+ backendServer.setHandler(context);
+
+ // Start Server
+ backendServer.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ backendServerUri = new URI(String.format("ws://%s:%d/ws", host, port));
+
+ }
+
+ /**
+ * Start Gateway Server.
+ *
+ * @throws Exception
+ */
+ private static void startGatewayServer() throws Exception {
+ gatewayServer = new Server();
+ final ServerConnector connector = new ServerConnector(gatewayServer);
+ gatewayServer.addConnector(connector);
+
+ /* workaround so we can add our handler later at runtime */
+ HandlerCollection handlers = new HandlerCollection(true);
+
+ /* add some initial handlers */
+ ContextHandler context = new ContextHandler();
+ context.setContextPath("/");
+ handlers.addHandler(context);
+
+ gatewayServer.setHandler(handlers);
+
+ // Start Server
+ gatewayServer.start();
+
+ String host = connector.getHost();
+ if (host == null) {
+ host = "localhost";
+ }
+ int port = connector.getLocalPort();
+ serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+ /* Setup websocket handler */
+ setupGatewayConfig(backendServerUri.toString());
+
+ final GatewayWebsocketHandler gatewayWebsocketHandler = new GatewayWebsocketHandler(
+ gatewayConfig, services);
+ handlers.addHandler(gatewayWebsocketHandler);
+ gatewayWebsocketHandler.start();
+ }
+
+ /**
+ * Initialize the configs and components required for this test.
+ *
+ * @param backend
+ * @throws IOException
+ */
+ private static void setupGatewayConfig(final String backend)
+ throws IOException {
+ services = new DefaultGatewayServices();
+
+ topoDir = createDir();
+ URL serviceUrl = ClassLoader.getSystemResource("websocket-services");
+
+ final File descriptor = new File(topoDir, "websocket.xml");
+ final FileOutputStream stream = new FileOutputStream(descriptor);
+ createKnoxTopology(backend).toStream(stream);
+ stream.close();
+
+ final TestTopologyListener topoListener = new TestTopologyListener();
+
+ final Map<String, String> options = new HashMap<String, String>();
+ options.put("persist-master", "false");
+ options.put("master", "password");
+
+ gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
+ EasyMock.expect(gatewayConfig.getGatewayTopologyDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewayServicesDir())
+ .andReturn(serviceUrl.getFile()).anyTimes();
+
+ EasyMock.expect(gatewayConfig.getEphemeralDHKeySize()).andReturn("2048")
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getGatewaySecurityDir())
+ .andReturn(topoDir.toString()).anyTimes();
+
+ /* Websocket configs */
+ EasyMock.expect(gatewayConfig.isWebsocketEnabled()).andReturn(true)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageBufferSize())
+ .andReturn(
+ GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketInputBufferSize())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketAsyncWriteTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT)
+ .anyTimes();
+
+ EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
+ .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+
+ EasyMock.replay(gatewayConfig);
+
+ try {
+ services.init(gatewayConfig, options);
+ } catch (ServiceLifecycleException e) {
+ e.printStackTrace();
+ }
+
+ DeploymentFactory.setGatewayServices(services);
+ final TopologyService monitor = services
+ .getService(GatewayServices.TOPOLOGY_SERVICE);
+ monitor.addTopologyChangeListener(topoListener);
+ monitor.reloadTopologies();
+
+ }
+
+ private static File createDir() throws IOException {
+ return TestUtils
+ .createTempDir(WebsocketEchoTest.class.getSimpleName() + "-");
+ }
+
+ private static XMLTag createKnoxTopology(final String backend) {
+ XMLTag xml = XMLDoc.newDocument(true).addRoot("topology").addTag("service")
+ .addTag("role").addText("WEBSOCKET").addTag("url").addText(backend)
+ .gotoParent().gotoRoot();
+ // System.out.println( "GATEWAY=" + xml.toString() );
+ return xml;
+ }
+
+ private static class TestTopologyListener implements TopologyListener {
+
+ public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+ @Override
+ public void handleTopologyEvent(List<TopologyEvent> events) {
+ this.events.add(events);
+
+ synchronized (this) {
+ for (TopologyEvent event : events) {
+ if (!event.getType().equals(TopologyEvent.Type.DELETED)) {
+
+ /* for this test we only care about this part */
+ DeploymentFactory.createDeployment(gatewayConfig,
+ event.getTopology());
+
+ }
+ }
+
+ }
+
+ }
+
+ }
+}