You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ku...@apache.org on 2022/05/20 13:45:59 UTC

[submarine] branch master updated: SUBMARINE-1069. Websocket in Submarine Server

This is an automated email from the ASF dual-hosted git repository.

kuanhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f6eb4d6 SUBMARINE-1069. Websocket in Submarine Server
1f6eb4d6 is described below

commit 1f6eb4d6b581eb64fa827b0ff02902a10fab3884
Author: featherchen <ga...@gmail.com>
AuthorDate: Fri May 20 21:37:40 2022 +0800

    SUBMARINE-1069. Websocket in Submarine Server
    
    ### What is this PR for?
    <!-- A few sentences describing the overall goals of the pull request's commits.
    First time? Check out the contributing guide - https://submarine.apache.org/contribution/contributions.html
    -->
    Considering future developing, extend websocket in Sebmarine Server.
    In this PR, I used the source code of old notebook websocket server(which I didn't remove in this PR, but should remove it in the future refactor process) to build three websocket server and related test.
    
    I name the url of each server is /ws/notebook/, /ws/experiment/, /ws/environment/
    And I rename the old websocket url as /wss
    
    ### What type of PR is it?
    Feature
    ### Todos
    * [x] - automatic test
    
    ### What is the Jira issue?
    <!-- * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE/
    * Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. `SUBMARINE-23. PR title`
    -->
    https://issues.apache.org/jira/browse/SUBMARINE-1069
    ### How should this be tested?
    <!--
    * First time? Setup Travis CI as described on https://submarine.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    -->
    By test in submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket
    ### Screenshots (if appropriate)
    ![Screenshot from 2022-04-19 20-06-41](https://user-images.githubusercontent.com/57944334/165305850-64638f40-8088-40c1-9b0c-8ac85df6e525.png)
    ![Screenshot from 2022-04-19 20-07-57](https://user-images.githubusercontent.com/57944334/165305859-f4533154-6f1c-4f5b-ab0d-0d5bd36f17b9.png)
    ![Screenshot from 2022-04-19 20-08-19](https://user-images.githubusercontent.com/57944334/165305867-64f6b22b-a3e7-4d0d-9786-772006e771a8.png)
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? Maybe
    
    Author: featherchen <ga...@gmail.com>
    
    Signed-off-by: kuanhsun <ku...@apache.org>
    
    Closes #852 from featherchen/SUBMARINE-1069 and squashes the following commits:
    
    0afab660 [featherchen] fix comment
    c55f250e [featherchen] fix bugs
    2c544392 [featherchen] fix bugs
    589626f3 [featherchen] add test
    bfddc094 [featherchen] add test
    ef0dfb87 [featherchen] add test
    f5e7d352 [featherchen] modify test
    d08c8892 [featherchen] modify test
    cdac6a21 [featherchen] delete redudent import
    f72b19e4 [featherchen] fix bug
    cc611b05 [featherchen] set up three websocket
    1ecc577d [featherchen] extend websocket component
    9011c0d8 [featherchen] test
    9c218d83 [featherchen] trivial
    84412201 [featherchen] change
    57a0a333 [featherchen] LOG problem
---
 dev-support/maven-config/checkstyle.xml            |   0
 submarine-server/server-core/pom.xml               |   1 +
 .../apache/submarine/server/SubmarineServer.java   |  47 +++++--
 .../server/websocket/BasicWebSocketCreator.java    |  39 ++++++
 .../server/websocket/ConnectionManager.java        | 102 +++++++++++++++
 .../server/websocket/DateJsonDeserializer.java     |  54 ++++++++
 .../apache/submarine/server/websocket/Message.java |  82 ++++++++++++
 .../server/websocket/WebSocketHandler.java         |  85 ++++++++++++
 .../server/websocket/WebSocketListener.java        |  26 ++++
 .../server/websocket/WebSocketServer.java          | 142 +++++++++++++++++++++
 .../server/AbstractSubmarineServerTest.java        |  21 ++-
 .../EnvironmentWebsocketTest.java}                 |  24 +++-
 .../ExperimentWebsocketTest.java}                  |  26 ++--
 .../NotebookWebsocketTest.java}                    |  23 +++-
 .../workbench/websocket/NotebookServerTest.java    |   2 +-
 15 files changed, 638 insertions(+), 36 deletions(-)

diff --git a/dev-support/maven-config/checkstyle.xml b/dev-support/maven-config/checkstyle.xml
old mode 100644
new mode 100755
diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml
index 6a9e6461..a1a956e6 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -437,6 +437,7 @@
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
+
   </dependencies>
 
   <build>
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index afb1f34a..92ce425a 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.submarine.server.rest.provider.YamlEntityProvider;
 import org.apache.submarine.server.workbench.websocket.NotebookServer;
+import org.apache.submarine.server.websocket.WebSocketServer;
 import org.apache.submarine.commons.cluster.ClusterServer;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.server.Handler;
@@ -105,6 +106,9 @@ public class SubmarineServer extends ResourceConfig {
             bindAsContract(NotebookServer.class)
                 .to(WebSocketServlet.class)
                 .in(Singleton.class);
+            bindAsContract(WebSocketServer.class)
+                .to(WebSocketServlet.class)
+                .in(Singleton.class);
           }
         });
 
@@ -117,13 +121,15 @@ public class SubmarineServer extends ResourceConfig {
     // Cluster Server is useless for submarine now. Shield it to improve performance.
     // setupClusterServer();
 
+    setupWebSocketServer(webApp, conf, sharedServiceLocator);
     startServer();
+
   }
 
   @Inject
   public SubmarineServer() {
     packages("org.apache.submarine.server.workbench.rest",
-             "org.apache.submarine.server.rest"
+        "org.apache.submarine.server.rest"
     );
     register(YamlEntityProvider.class);
   }
@@ -170,7 +176,7 @@ public class SubmarineServer extends ResourceConfig {
   }
 
   private static WebAppContext setupWebAppContext(HandlerList handlers,
-      SubmarineConfiguration conf) {
+                                                  SubmarineConfiguration conf) {
     WebAppContext webApp = new WebAppContext();
     webApp.setContextPath("/");
     File warPath = new File(conf.getString(SubmarineConfVars.ConfVars.WORKBENCH_WEB_WAR));
@@ -196,7 +202,7 @@ public class SubmarineServer extends ResourceConfig {
     webApp.addServlet(new ServletHolder(RefreshServlet.class), "/user/*");
     webApp.addServlet(new ServletHolder(RefreshServlet.class), "/workbench/*");
 
-    handlers.setHandlers(new Handler[] { webApp });
+    handlers.setHandlers(new Handler[]{webApp});
 
     return webApp;
   }
@@ -223,9 +229,9 @@ public class SubmarineServer extends ResourceConfig {
       httpsConfig.addCustomizer(src);
 
       connector = new ServerConnector(
-              server,
-              new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()),
-              new HttpConnectionFactory(httpsConfig));
+          server,
+          new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()),
+          new HttpConnectionFactory(httpsConfig));
     } else {
       connector = new ServerConnector(server);
     }
@@ -246,14 +252,37 @@ public class SubmarineServer extends ResourceConfig {
   }
 
   private static void setupNotebookServer(WebAppContext webapp,
-      SubmarineConfiguration conf, ServiceLocator serviceLocator) {
+                                          SubmarineConfiguration conf, ServiceLocator serviceLocator) {
     String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize();
     final ServletHolder servletHolder =
         new ServletHolder(serviceLocator.getService(NotebookServer.class));
     servletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
 
     final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-    webapp.addServlet(servletHolder, "/ws/*");
+    webapp.addServlet(servletHolder, "/wss/*");
+  }
+
+  private static void setupWebSocketServer(WebAppContext webapp,
+                                           SubmarineConfiguration conf, ServiceLocator serviceLocator) {
+    String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize();
+    final ServletHolder notebookServletHolder =
+        new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+    notebookServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+    final ServletHolder experimentServletHolder =
+        new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+    experimentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+    final ServletHolder environmentServletHolder =
+        new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+    environmentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+
+
+    final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    webapp.addServlet(notebookServletHolder, "/ws/notebook/*");
+    webapp.addServlet(experimentServletHolder, "/ws/experiment/*");
+    webapp.addServlet(environmentServletHolder, "/ws/environment/*");
   }
 
   private static void setupClusterServer() {
@@ -331,7 +360,7 @@ public class SubmarineServer extends ResourceConfig {
 
       StringBuilder sbIndexBuf = new StringBuilder();
       try (InputStreamReader reader =
-                   new InputStreamReader(new FileInputStream(indexFile), "GBK");
+               new InputStreamReader(new FileInputStream(indexFile), "GBK");
            BufferedReader bufferedReader = new BufferedReader(reader);) {
         String lineTxt = null;
         while ((lineTxt = bufferedReader.readLine()) != null) {
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java
new file mode 100644
index 00000000..cfe5be8d
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible to create the WebSockets for WebSocketServer.
+ */
+public class BasicWebSocketCreator implements WebSocketCreator {
+  private static final Logger LOG = LoggerFactory.getLogger(BasicWebSocketCreator.class);
+  private WebSocketServer webSocketServer;
+
+  public BasicWebSocketCreator(WebSocketServer webSocketServer) {
+    this.webSocketServer = webSocketServer;
+  }
+  public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
+    return new WebSocketHandler(request.getHttpServletRequest(), "", webSocketServer);
+  }
+
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java
new file mode 100644
index 00000000..dec72775
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.websocket;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.eclipse.jetty.websocket.api.WebSocketException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Manager class for managing websocket connections.
+ */
+public class ConnectionManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);
+  private static final Gson gson = new GsonBuilder()
+      .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+      .registerTypeAdapter(Date.class, new DateJsonDeserializer())
+      .setPrettyPrinting()
+      .create();
+
+  final Queue<WebSocketHandler> connectedSockets = new ConcurrentLinkedQueue<>();
+  // user -> connection
+  final Map<String, Queue<WebSocketHandler>> userSocketMap = new ConcurrentHashMap<>();
+
+  public void addConnection(WebSocketHandler conn) {
+    connectedSockets.add(conn);
+  }
+
+  public void removeConnection(WebSocketHandler conn) {
+    connectedSockets.remove(conn);
+  }
+
+  public void addUserConnection(String user, WebSocketHandler conn) {
+    LOG.info("Add user connection {} for user: {}", conn, user);
+    conn.setUser(user);
+    if (userSocketMap.containsKey(user)) {
+      userSocketMap.get(user).add(conn);
+    } else {
+      Queue<WebSocketHandler> socketQueue = new ConcurrentLinkedQueue<>();
+      socketQueue.add(conn);
+      userSocketMap.put(user, socketQueue);
+    }
+  }
+
+  public void removeUserConnection(String user, WebSocketHandler conn) {
+    LOG.info("Remove user connection {} for user: {}", conn, user);
+    if (userSocketMap.containsKey(user)) {
+      userSocketMap.get(user).remove(conn);
+    } else {
+      LOG.warn("Closing connection that is absent in user connections");
+    }
+  }
+
+  protected String serializeMessage(Message m) {
+    return gson.toJson(m);
+  }
+
+  public void broadcast(Message m) {
+    synchronized (connectedSockets) {
+      for (WebSocketHandler ns : connectedSockets) {
+        try {
+          ns.send(serializeMessage(m));
+        } catch (IOException | WebSocketException e) {
+          LOG.error("Send error: " + m, e);
+        }
+      }
+    }
+  }
+
+  public Set<String> getConnectedUsers() {
+    Set<String> connectedUsers = Sets.newHashSet();
+    for (WebSocketHandler notebookSocket : connectedSockets) {
+      connectedUsers.add(notebookSocket.getUser());
+    }
+    return connectedUsers;
+  }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java
new file mode 100644
index 00000000..6fe30e07
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Type;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Locale;
+
+public class DateJsonDeserializer implements JsonDeserializer{
+  private final String[] DATE_FORMATS = new String[] {
+      "yyyy-MM-dd'T'HH:mm:ssZ",
+      "MMM d, yyyy h:mm:ss a",
+      "MMM dd, yyyy HH:mm:ss",
+      "yyyy-MM-dd HH:mm:ss.SSS"
+  };
+
+  @Override
+  public Date deserialize(JsonElement jsonElement, Type typeOF,
+      JsonDeserializationContext context) throws JsonParseException {
+    for (String format : DATE_FORMATS) {
+      try {
+        return new SimpleDateFormat(format, Locale.US).parse(jsonElement.getAsString());
+      } catch (ParseException e) {
+        throw new JsonParseException("Unparsable date: \"" + jsonElement.getAsString()
+            + "\". Supported formats: " + Arrays.toString(DATE_FORMATS));
+      } catch (Exception e){
+        e.printStackTrace();
+      }
+    }
+    throw new RuntimeException("Unexpected Error in Deserialize Date");
+  }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java
new file mode 100644
index 00000000..96de8a96
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Message {
+  /**
+   * Representation of event type.
+   */
+  public enum OP {
+    ERROR_INFO,                   // [s-c] error information to be sent
+    NOTICE                        // [s-c] Notice
+  }
+
+  private static final Gson gson = new Gson();
+  public static final Message EMPTY = new Message(null);
+
+  public OP op;
+  public Map<String, Object> data = new HashMap<>();
+
+  public Message(OP op) {
+    this.op = op;
+  }
+
+  public Message put(String k, Object v) {
+    data.put(k, v);
+    return this;
+  }
+
+  public Object get(String k) {
+    return data.get(k);
+  }
+
+  public <T> T getType(String key) {
+    return (T) data.get(key);
+  }
+
+  public <T> T getType(String key, Logger LOG) {
+    try {
+      return getType(key);
+    } catch (ClassCastException e) {
+      LOG.error("Failed to get " + key + " from message (Invalid type). " , e);
+      return null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Message{");
+    sb.append("data=").append(data);
+    sb.append(", op=").append(op);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  public String toJson() {
+    return gson.toJson(this);
+  }
+
+  public static Message fromJson(String json) {
+    return gson.fromJson(json, Message.class);
+  }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java
new file mode 100644
index 00000000..c8d2f0a8
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+
+/**
+ * Websocket handler.
+ */
+public class WebSocketHandler extends WebSocketAdapter {
+  private Session connection;
+  private WebSocketListener listener;
+  private HttpServletRequest request;
+  private String protocol;
+  private String user;
+
+  public WebSocketHandler(HttpServletRequest req, String protocol,
+      WebSocketListener listener) {
+    this.listener = listener;
+    this.request = req;
+    this.protocol = protocol;
+    this.user = StringUtils.EMPTY;
+  }
+
+  @Override
+  public void onWebSocketClose(int closeCode, String message) {
+    listener.onClose(this, closeCode, message);
+  }
+
+  @Override
+  public void onWebSocketConnect(Session connection) {
+    this.connection = connection;
+    listener.onOpen(this);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    listener.onMessage(this, message);
+  }
+
+  public HttpServletRequest getRequest() {
+    return request;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public synchronized void send(String serializeMessage) throws IOException {
+    connection.getRemote().sendString(serializeMessage);
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  @Override
+  public String toString() {
+    return request.getRemoteHost() + ":" + request.getRemotePort();
+  }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java
new file mode 100644
index 00000000..79e319f5
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+/**
+ * WebSocket listener.
+ */
+public interface WebSocketListener {
+  void onClose(WebSocketHandler socket, int code, String message);
+  void onOpen(WebSocketHandler socket);
+  void onMessage(WebSocketHandler socket, String message);
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java
new file mode 100644
index 00000000..2c1f536d
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.annotation.ManagedOperation;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Submarine websocket service. This class used setter injection because all servlet should have
+ * no-parameter constructor
+ */
+@ManagedObject
+public class WebSocketServer extends WebSocketServlet
+    implements org.apache.submarine.server.websocket.WebSocketListener {
+
+  /**
+   * Job manager service type.
+   */
+  protected enum JobManagerServiceType {
+    JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
+    private String serviceTypeKey;
+
+    JobManagerServiceType(String serviceType) {
+      this.serviceTypeKey = serviceType;
+    }
+
+    String getKey() {
+      return this.serviceTypeKey;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
+  private static Gson gson = new GsonBuilder()
+      .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+      .registerTypeAdapter(Date.class, new DateJsonDeserializer())
+      .setPrettyPrinting()
+      .create();
+
+  private static AtomicReference<WebSocketServer> self = new AtomicReference<>();
+
+  private ConnectionManager connectionManager;
+
+  private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+  public WebSocketServer() {
+    this.connectionManager = new ConnectionManager();
+    WebSocketServer.self.set(this);
+    LOG.info("WebSocketServer instantiated: {}", this);
+  }
+
+  @Override
+  public void configure(WebSocketServletFactory factory) {
+    factory.setCreator(new BasicWebSocketCreator(this));
+  }
+
+  @Override
+  public void onOpen(WebSocketHandler conn) {
+    LOG.info("New connection from {}", conn);
+    connectionManager.addConnection(conn);
+  }
+
+  @Override
+  public void onMessage(WebSocketHandler conn, String msg) {
+    try {
+      LOG.info("Got Message: " + msg);
+      if (StringUtils.isEmpty(conn.getUser())) {
+        connectionManager.addUserConnection("FakeUser1", conn);
+      }
+    } catch (Exception e) {
+      LOG.error("Can't handle message: " + msg, e);
+      try {
+        conn.send(serializeMessage(new Message(Message.OP.ERROR_INFO).put(
+            "info", e.getMessage())));
+      } catch (IOException iox) {
+        LOG.error("Fail to send error info", iox);
+      }
+    }
+  }
+
+  @Override
+  public void onClose(WebSocketHandler conn, int code, String reason) {
+    LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
+    connectionManager.removeConnection(conn);
+    connectionManager.removeUserConnection(conn.getUser(), conn);
+  }
+
+  public ConnectionManager getConnectionManager() {
+    return connectionManager;
+  }
+
+  protected Message deserializeMessage(String msg) {
+    return gson.fromJson(msg, Message.class);
+  }
+
+  protected String serializeMessage(Message m) {
+    return gson.toJson(m);
+  }
+
+  public void broadcast(Message m) {
+    connectionManager.broadcast(m);
+  }
+
+  @ManagedAttribute
+  public Set<String> getConnectedUsers() {
+    return connectionManager.getConnectedUsers();
+  }
+
+  @ManagedOperation
+  public void sendMessage(String message) {
+    Message m = new Message(Message.OP.NOTICE);
+    m.data.put("notice", message);
+    connectionManager.broadcast(m);
+  }
+}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
index 7cd4fe51..eab31af7 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
@@ -75,7 +75,10 @@ public abstract class AbstractSubmarineServerTest {
   protected static final Logger LOG =
       LoggerFactory.getLogger(AbstractSubmarineServerTest.class);
 
-  static final String WEBSOCKET_API_URL = "/ws";
+  static final String WEBSOCKET_API_URL = "/wss";
+  static final String WEBSOCKET_NOTEBOOK_API_URL = "/ws/notebook";
+  static final String WEBSOCKET_EXPERIMENT_API_URL = "/ws/experiment";
+  static final String WEBSOCKET_ENVIRONMENT_API_URL = "/ws/environment";
   static final String URL = getUrlToTest();
   protected static final boolean WAS_RUNNING = checkIfServerIsRunning();
 
@@ -86,8 +89,20 @@ public abstract class AbstractSubmarineServerTest {
       "/api/" + RestConstants.V1 + "/" + RestConstants.ENVIRONMENT;
   protected static String ENV_NAME = "my-submarine-env";
 
-  public static String getWebsocketApiUrlToTest() {
-    String websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL;
+  public static String getWebsocketApiUrlToTest(String serverName) {
+
+    String websocketUrl = "ws://localhost:8080";
+    if (serverName.equals("wss")) {
+      websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL;
+    } else if (serverName.equals("notebook")) {
+      websocketUrl = "ws://localhost:8080" + WEBSOCKET_NOTEBOOK_API_URL;
+    }
+    else if (serverName.equals("environment")) {
+      websocketUrl = "ws://localhost:8080" + WEBSOCKET_ENVIRONMENT_API_URL;
+    }
+    else if (serverName.equals("experiment")) {
+      websocketUrl = "ws://localhost:8080" + WEBSOCKET_EXPERIMENT_API_URL;
+    }
     if (System.getProperty("websocketUrl") != null) {
       websocketUrl = System.getProperty("websocketurl");
     }
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
index a18e0cdd..af8149a5 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
 
 import org.apache.submarine.server.AbstractSubmarineServerTest;
 import org.eclipse.jetty.websocket.api.Session;
@@ -24,16 +24,22 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.concurrent.Future;
 
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
 
+
+public class EnvironmentWebsocketTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EnvironmentWebsocketTest.class);
   @BeforeClass
   public static void init() throws Exception {
     AbstractSubmarineServerTest.startUp(
-        NotebookServerTest.class.getSimpleName());
+        EnvironmentWebsocketTest.class.getSimpleName());
   }
 
   @AfterClass
@@ -44,8 +50,9 @@ public class NotebookServerTest {
   @Test
   public void testWebsocketConnection() throws Exception{
     URI uri = URI.create(
-        AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+        AbstractSubmarineServerTest.getWebsocketApiUrlToTest("environment"));
     WebSocketClient client = new WebSocketClient();
+
     try {
       client.start();
       // The socket that receives events
@@ -70,21 +77,24 @@ public class NotebookServerTest {
     public void onWebSocketConnect(Session sess)
     {
       super.onWebSocketConnect(sess);
-      System.out.println("Socket Connected: " + sess);
+      LOG.info("Socket Connected: " + sess);
     }
 
     @Override
     public void onWebSocketText(String message)
     {
       super.onWebSocketText(message);
-      System.out.println("Received TEXT message: " + message);
+      LOG.info("Received TEXT message: " + message);
+      assertEquals(message, "Hello");
     }
 
     @Override
     public void onWebSocketClose(int statusCode, String reason)
     {
       super.onWebSocketClose(statusCode, reason);
-      System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+      LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+      assertEquals(statusCode, StatusCode.NORMAL);
+      assertEquals(reason, "I'm done");
     }
 
     @Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
index a18e0cdd..edd79099 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
 
-import org.apache.submarine.server.AbstractSubmarineServerTest;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.StatusCode;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -24,16 +23,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.net.URI;
 import java.util.concurrent.Future;
+import org.apache.submarine.server.AbstractSubmarineServerTest;
 
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
 
+public class ExperimentWebsocketTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExperimentWebsocketTest.class);
   @BeforeClass
   public static void init() throws Exception {
     AbstractSubmarineServerTest.startUp(
-        NotebookServerTest.class.getSimpleName());
+        ExperimentWebsocketTest.class.getSimpleName());
   }
 
   @AfterClass
@@ -44,8 +48,9 @@ public class NotebookServerTest {
   @Test
   public void testWebsocketConnection() throws Exception{
     URI uri = URI.create(
-        AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+        AbstractSubmarineServerTest.getWebsocketApiUrlToTest("experiment"));
     WebSocketClient client = new WebSocketClient();
+
     try {
       client.start();
       // The socket that receives events
@@ -70,21 +75,24 @@ public class NotebookServerTest {
     public void onWebSocketConnect(Session sess)
     {
       super.onWebSocketConnect(sess);
-      System.out.println("Socket Connected: " + sess);
+      LOG.info("Socket Connected: " + sess);
     }
 
     @Override
     public void onWebSocketText(String message)
     {
       super.onWebSocketText(message);
-      System.out.println("Received TEXT message: " + message);
+      LOG.info("Received TEXT message: " + message);
+      assertEquals(message, "Hello");
     }
 
     @Override
     public void onWebSocketClose(int statusCode, String reason)
     {
       super.onWebSocketClose(statusCode, reason);
-      System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+      LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+      assertEquals(statusCode, StatusCode.NORMAL);
+      assertEquals(reason, "I'm done");
     }
 
     @Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
index a18e0cdd..d62e79ae 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
 
 import org.apache.submarine.server.AbstractSubmarineServerTest;
 import org.eclipse.jetty.websocket.api.Session;
@@ -24,16 +24,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.concurrent.Future;
 
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
 
+public class NotebookWebsocketTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NotebookWebsocketTest.class);
   @BeforeClass
   public static void init() throws Exception {
     AbstractSubmarineServerTest.startUp(
-        NotebookServerTest.class.getSimpleName());
+        NotebookWebsocketTest.class.getSimpleName());
   }
 
   @AfterClass
@@ -44,8 +49,9 @@ public class NotebookServerTest {
   @Test
   public void testWebsocketConnection() throws Exception{
     URI uri = URI.create(
-        AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+        AbstractSubmarineServerTest.getWebsocketApiUrlToTest("notebook"));
     WebSocketClient client = new WebSocketClient();
+
     try {
       client.start();
       // The socket that receives events
@@ -70,21 +76,24 @@ public class NotebookServerTest {
     public void onWebSocketConnect(Session sess)
     {
       super.onWebSocketConnect(sess);
-      System.out.println("Socket Connected: " + sess);
+      LOG.info("Socket Connected: " + sess);
     }
 
     @Override
     public void onWebSocketText(String message)
     {
       super.onWebSocketText(message);
-      System.out.println("Received TEXT message: " + message);
+      LOG.info("Received TEXT message: " + message);
+      assertEquals(message, "Hello");
     }
 
     @Override
     public void onWebSocketClose(int statusCode, String reason)
     {
       super.onWebSocketClose(statusCode, reason);
-      System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+      LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+      assertEquals(statusCode, StatusCode.NORMAL);
+      assertEquals(reason, "I'm done");
     }
 
     @Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
index a18e0cdd..77aaa19d 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
@@ -44,7 +44,7 @@ public class NotebookServerTest {
   @Test
   public void testWebsocketConnection() throws Exception{
     URI uri = URI.create(
-        AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+        AbstractSubmarineServerTest.getWebsocketApiUrlToTest("wss"));
     WebSocketClient client = new WebSocketClient();
     try {
       client.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org