You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2017/05/09 13:51:08 UTC
knox git commit: KNOX-936 - On websocket error properly close all the
sessions and containers.
Repository: knox
Updated Branches:
refs/heads/master 1bcfe47aa -> 0f207409c
KNOX-936 - On websocket error properly close all the sessions and containers.
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/0f207409
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/0f207409
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/0f207409
Branch: refs/heads/master
Commit: 0f207409c02fff54e3fb3a5a1c0b3865ef91e7db
Parents: 1bcfe47
Author: Sandeep More <mo...@apache.org>
Authored: Tue May 9 09:50:58 2017 -0400
Committer: Sandeep More <mo...@apache.org>
Committed: Tue May 9 09:50:58 2017 -0400
----------------------------------------------------------------------
.../websockets/GatewayWebsocketHandler.java | 16 +++++-
.../gateway/websockets/ProxyInboundSocket.java | 2 -
.../websockets/ProxyWebSocketAdapter.java | 60 +++++++++++++++-----
.../gateway/websockets/BadBackendTest.java | 3 +-
.../hadoop/gateway/websockets/BadUrlTest.java | 2 +
.../websockets/ConnectionDroppedTest.java | 3 +-
.../gateway/websockets/MessageFailureTest.java | 3 +-
7 files changed, 68 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
index 8faa4f4..a0c7f5f 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -22,6 +22,8 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.gateway.config.GatewayConfig;
@@ -57,6 +59,14 @@ public class GatewayWebsocketHandler extends WebSocketHandler
static final String REGEX_SPLIT_CONTEXT = "^((?:[^/]*/){2}[^/]*)";
+ private static final int POOL_SIZE = 10;
+
+ /**
+ * Manage the threads that are spawned
+ * @since 0.13
+ */
+ private final ExecutorService pool;
+
final GatewayConfig config;
final GatewayServices services;
@@ -72,6 +82,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
this.config = config;
this.services = services;
+ pool = Executors.newFixedThreadPool(POOL_SIZE);
}
@@ -124,7 +135,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
final String backendURL = getMatchedBackendURL(path);
/* Upgrade happens here */
- return new ProxyWebSocketAdapter(URI.create(backendURL));
+ return new ProxyWebSocketAdapter(URI.create(backendURL), pool);
} catch (final Exception e) {
LOG.failedCreatingWebSocket(e);
throw e;
@@ -136,8 +147,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
* url. If websocket url is found it is used as is, or we default to
* ws://{host}:{port} which might or might not be right.
*
- * @param The
- * context path
+ * @param The context path
* @return Websocket backend url
*/
private synchronized String getMatchedBackendURL(final String path) {
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
index 3c0edaf..6de3f9c 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
@@ -59,14 +59,12 @@ public class ProxyInboundSocket {
@OnError
public void onClientError(Throwable cause) {
- cause.printStackTrace(System.err);
callback.onError(cause);
}
@OnMessage(maxMessageSize = Integer.MAX_VALUE)
public void onBackendMessage(final String message,
final javax.websocket.Session session) {
-
callback.onMessageText(message, session);
}
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
index 629f63d..1e7f583 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.gateway.websockets;
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
@@ -55,12 +57,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
private WebSocketContainer container;
+ private ExecutorService pool;
+
/**
* Create an instance
*/
- public ProxyWebSocketAdapter(URI backend) {
+ public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
super();
this.backend = backend;
+ this.pool = pool;
}
@Override
@@ -129,7 +134,13 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
public void onWebSocketClose(int statusCode, String reason) {
super.onWebSocketClose(statusCode, reason);
- closeQuietly();
+ /* do the cleaning business in seperate thread so we don't block */
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ closeQuietly();
+ }
+ });
LOG.onConnectionClose(backend.toString());
@@ -137,23 +148,34 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
@Override
public void onWebSocketError(final Throwable t) {
- passErrorToInboundConnection(t);
+ cleanupOnError(t);
}
/**
- * A helper function to pass errors to Inbound connection (browser/client)
+ * Cleanup sessions
*/
- private void passErrorToInboundConnection(final Throwable t) {
+ private void cleanupOnError(final Throwable t) {
LOG.onError(t.toString());
if (t.toString().contains("exceeds maximum size")) {
- frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+ if(frontendSession != null && !frontendSession.isOpen()) {
+ frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+ }
}
else {
- frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
- closeQuietly();
- throw new RuntimeException(t);
+ if(frontendSession != null && !frontendSession.isOpen()) {
+ frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
+ }
+
+ /* do the cleaning business in seperate thread so we don't block */
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ closeQuietly();
+ }
+ });
+
}
}
@@ -179,14 +201,22 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
frontendSession.close(reason.getCloseCode().getCode(),
reason.getReasonPhrase());
} finally {
- closeQuietly();
+
+ /* do the cleaning business in seperate thread so we don't block */
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ closeQuietly();
+ }
+ });
+
}
}
@Override
public void onError(Throwable cause) {
- passErrorToInboundConnection(cause);
+ cleanupOnError(cause);
}
@Override
@@ -223,7 +253,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
private void closeQuietly() {
try {
- backendSession.close();
+ if(backendSession != null && !backendSession.isOpen()) {
+ backendSession.close();
+ }
} catch (IOException e) {
LOG.connectionFailed(e);
}
@@ -236,7 +268,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
}
}
- frontendSession.close();
+ if(frontendSession != null && !frontendSession.isOpen()) {
+ frontendSession.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
index 1e4b86f..88d4f5b 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
@@ -94,7 +95,7 @@ public class BadBackendTest {
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(new URI(BAD_BACKEND)));
+ new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10)));
ContextHandler context = new ContextHandler();
context.setContextPath("/");
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
index 4b9f836..135b385 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
@@ -119,6 +119,7 @@ public class BadUrlTest {
*
* @throws Exception
*/
+
@Test
public void testBadUrl() throws Exception {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
@@ -136,6 +137,7 @@ public class BadUrlTest {
}
+
/**
* Start Gateway Server.
*
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
index 6562e5c..3c3f40d 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
@@ -135,7 +136,7 @@ public class ConnectionDroppedTest {
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(serverUri));
+ new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
ContextHandler context = new ContextHandler();
context.setContextPath("/");
http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
index f98b7e1..bbb6f13 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
import java.io.IOException;
import java.net.URI;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
@@ -155,7 +156,7 @@ public class MessageFailureTest {
/* start Knox with WebsocketAdapter to test */
final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
- new ProxyWebSocketAdapter(serverUri));
+ new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
ContextHandler context = new ContextHandler();
context.setContextPath("/");