You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2017/05/09 13:51:08 UTC

knox git commit: KNOX-936 - On websocket error properly close all the sessions and containers.

Repository: knox
Updated Branches:
  refs/heads/master 1bcfe47aa -> 0f207409c


KNOX-936 - On websocket error properly close all the sessions and containers.


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/0f207409
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/0f207409
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/0f207409

Branch: refs/heads/master
Commit: 0f207409c02fff54e3fb3a5a1c0b3865ef91e7db
Parents: 1bcfe47
Author: Sandeep More <mo...@apache.org>
Authored: Tue May 9 09:50:58 2017 -0400
Committer: Sandeep More <mo...@apache.org>
Committed: Tue May 9 09:50:58 2017 -0400

----------------------------------------------------------------------
 .../websockets/GatewayWebsocketHandler.java     | 16 +++++-
 .../gateway/websockets/ProxyInboundSocket.java  |  2 -
 .../websockets/ProxyWebSocketAdapter.java       | 60 +++++++++++++++-----
 .../gateway/websockets/BadBackendTest.java      |  3 +-
 .../hadoop/gateway/websockets/BadUrlTest.java   |  2 +
 .../websockets/ConnectionDroppedTest.java       |  3 +-
 .../gateway/websockets/MessageFailureTest.java  |  3 +-
 7 files changed, 68 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
index 8faa4f4..a0c7f5f 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -22,6 +22,8 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.gateway.config.GatewayConfig;
@@ -57,6 +59,14 @@ public class GatewayWebsocketHandler extends WebSocketHandler
   
   static final String REGEX_SPLIT_CONTEXT = "^((?:[^/]*/){2}[^/]*)";
 
+  private static final int POOL_SIZE = 10;
+
+  /**
+   * Manage the threads that are spawned
+   * @since 0.13
+   */
+  private final ExecutorService pool;
+
   final GatewayConfig config;
   final GatewayServices services;
 
@@ -72,6 +82,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
 
     this.config = config;
     this.services = services;
+    pool = Executors.newFixedThreadPool(POOL_SIZE);
 
   }
 
@@ -124,7 +135,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
       final String backendURL = getMatchedBackendURL(path);
 
       /* Upgrade happens here */
-      return new ProxyWebSocketAdapter(URI.create(backendURL));
+      return new ProxyWebSocketAdapter(URI.create(backendURL), pool);
     } catch (final Exception e) {
       LOG.failedCreatingWebSocket(e);
       throw e;
@@ -136,8 +147,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler
    * url. If websocket url is found it is used as is, or we default to
    * ws://{host}:{port} which might or might not be right.
    * 
-   * @param The
-   *          context path
+   * @param  The context path
    * @return Websocket backend url
    */
   private synchronized String getMatchedBackendURL(final String path) {

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
index 3c0edaf..6de3f9c 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
@@ -59,14 +59,12 @@ public class ProxyInboundSocket {
 
   @OnError
   public void onClientError(Throwable cause) {
-    cause.printStackTrace(System.err);
     callback.onError(cause);
   }
 
   @OnMessage(maxMessageSize = Integer.MAX_VALUE)
   public void onBackendMessage(final String message,
       final javax.websocket.Session session) {
-
     callback.onMessageText(message, session);
 
   }

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
index 629f63d..1e7f583 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
@@ -55,12 +57,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
 
   private WebSocketContainer container;
 
+  private ExecutorService pool;
+
   /**
    * Create an instance
    */
-  public ProxyWebSocketAdapter(URI backend) {
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
     super();
     this.backend = backend;
+    this.pool = pool;
   }
 
   @Override
@@ -129,7 +134,13 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
   public void onWebSocketClose(int statusCode, String reason) {
     super.onWebSocketClose(statusCode, reason);
 
-    closeQuietly();
+    /* do the cleaning business in seperate thread so we don't block */
+    pool.execute(new Runnable() {
+      @Override
+      public void run() {
+        closeQuietly();
+      }
+    });
 
     LOG.onConnectionClose(backend.toString());
 
@@ -137,23 +148,34 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
 
   @Override
   public void onWebSocketError(final Throwable t) {
-    passErrorToInboundConnection(t);
+    cleanupOnError(t);
   }
 
   /**
-   * A helper function to pass errors to Inbound connection (browser/client)
+   * Cleanup sessions
    */
-  private void passErrorToInboundConnection(final Throwable t) {
+  private void cleanupOnError(final Throwable t) {
 
     LOG.onError(t.toString());
     if (t.toString().contains("exceeds maximum size")) {
-      frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+      if(frontendSession != null && !frontendSession.isOpen()) {
+        frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+      }
     }
 
     else {
-      frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
-      closeQuietly();
-      throw new RuntimeException(t);
+      if(frontendSession != null && !frontendSession.isOpen()) {
+        frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
+      }
+
+      /* do the cleaning business in seperate thread so we don't block */
+      pool.execute(new Runnable() {
+        @Override
+        public void run() {
+          closeQuietly();
+        }
+      });
+
     }
   }
 
@@ -179,14 +201,22 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
           frontendSession.close(reason.getCloseCode().getCode(),
               reason.getReasonPhrase());
         } finally {
-          closeQuietly();
+
+          /* do the cleaning business in seperate thread so we don't block */
+          pool.execute(new Runnable() {
+            @Override
+            public void run() {
+              closeQuietly();
+            }
+          });
+
         }
 
       }
 
       @Override
       public void onError(Throwable cause) {
-        passErrorToInboundConnection(cause);
+        cleanupOnError(cause);
       }
 
       @Override
@@ -223,7 +253,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
   private void closeQuietly() {
 
     try {
-      backendSession.close();
+      if(backendSession != null && !backendSession.isOpen()) {
+        backendSession.close();
+      }
     } catch (IOException e) {
       LOG.connectionFailed(e);
     }
@@ -236,7 +268,9 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
       }
     }
 
-    frontendSession.close();
+    if(frontendSession != null && !frontendSession.isOpen()) {
+      frontendSession.close();
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
index 1e4b86f..88d4f5b 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.websocket.CloseReason;
@@ -94,7 +95,7 @@ public class BadBackendTest {
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(new URI(BAD_BACKEND)));
+        new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10)));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
index 4b9f836..135b385 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
@@ -119,6 +119,7 @@ public class BadUrlTest {
    *
    * @throws Exception
    */
+
   @Test
   public void testBadUrl() throws Exception {
     WebSocketContainer container = ContainerProvider.getWebSocketContainer();
@@ -136,6 +137,7 @@ public class BadUrlTest {
 
   }
 
+
   /**
    * Start Gateway Server.
    *

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
index 6562e5c..3c3f40d 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.websocket.CloseReason;
@@ -135,7 +136,7 @@ public class ConnectionDroppedTest {
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");

http://git-wip-us.apache.org/repos/asf/knox/blob/0f207409/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
index f98b7e1..bbb6f13 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.websocket.CloseReason;
@@ -155,7 +156,7 @@ public class MessageFailureTest {
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");