You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by ku...@apache.org on 2022/05/20 13:45:59 UTC
[submarine] branch master updated: SUBMARINE-1069. Websocket in Submarine Server
This is an automated email from the ASF dual-hosted git repository.
kuanhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 1f6eb4d6 SUBMARINE-1069. Websocket in Submarine Server
1f6eb4d6 is described below
commit 1f6eb4d6b581eb64fa827b0ff02902a10fab3884
Author: featherchen <ga...@gmail.com>
AuthorDate: Fri May 20 21:37:40 2022 +0800
SUBMARINE-1069. Websocket in Submarine Server
### What is this PR for?
<!-- A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://submarine.apache.org/contribution/contributions.html
-->
Considering future developing, extend websocket in Sebmarine Server.
In this PR, I used the source code of old notebook websocket server(which I didn't remove in this PR, but should remove it in the future refactor process) to build three websocket server and related test.
I name the url of each server is /ws/notebook/, /ws/experiment/, /ws/environment/
And I rename the old websocket url as /wss
### What type of PR is it?
Feature
### Todos
* [x] - automatic test
### What is the Jira issue?
<!-- * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE/
* Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. `SUBMARINE-23. PR title`
-->
https://issues.apache.org/jira/browse/SUBMARINE-1069
### How should this be tested?
<!--
* First time? Setup Travis CI as described on https://submarine.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
-->
By test in submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket
### Screenshots (if appropriate)
![Screenshot from 2022-04-19 20-06-41](https://user-images.githubusercontent.com/57944334/165305850-64638f40-8088-40c1-9b0c-8ac85df6e525.png)
![Screenshot from 2022-04-19 20-07-57](https://user-images.githubusercontent.com/57944334/165305859-f4533154-6f1c-4f5b-ab0d-0d5bd36f17b9.png)
![Screenshot from 2022-04-19 20-08-19](https://user-images.githubusercontent.com/57944334/165305867-64f6b22b-a3e7-4d0d-9786-772006e771a8.png)
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? Maybe
Author: featherchen <ga...@gmail.com>
Signed-off-by: kuanhsun <ku...@apache.org>
Closes #852 from featherchen/SUBMARINE-1069 and squashes the following commits:
0afab660 [featherchen] fix comment
c55f250e [featherchen] fix bugs
2c544392 [featherchen] fix bugs
589626f3 [featherchen] add test
bfddc094 [featherchen] add test
ef0dfb87 [featherchen] add test
f5e7d352 [featherchen] modify test
d08c8892 [featherchen] modify test
cdac6a21 [featherchen] delete redudent import
f72b19e4 [featherchen] fix bug
cc611b05 [featherchen] set up three websocket
1ecc577d [featherchen] extend websocket component
9011c0d8 [featherchen] test
9c218d83 [featherchen] trivial
84412201 [featherchen] change
57a0a333 [featherchen] LOG problem
---
dev-support/maven-config/checkstyle.xml | 0
submarine-server/server-core/pom.xml | 1 +
.../apache/submarine/server/SubmarineServer.java | 47 +++++--
.../server/websocket/BasicWebSocketCreator.java | 39 ++++++
.../server/websocket/ConnectionManager.java | 102 +++++++++++++++
.../server/websocket/DateJsonDeserializer.java | 54 ++++++++
.../apache/submarine/server/websocket/Message.java | 82 ++++++++++++
.../server/websocket/WebSocketHandler.java | 85 ++++++++++++
.../server/websocket/WebSocketListener.java | 26 ++++
.../server/websocket/WebSocketServer.java | 142 +++++++++++++++++++++
.../server/AbstractSubmarineServerTest.java | 21 ++-
.../EnvironmentWebsocketTest.java} | 24 +++-
.../ExperimentWebsocketTest.java} | 26 ++--
.../NotebookWebsocketTest.java} | 23 +++-
.../workbench/websocket/NotebookServerTest.java | 2 +-
15 files changed, 638 insertions(+), 36 deletions(-)
diff --git a/dev-support/maven-config/checkstyle.xml b/dev-support/maven-config/checkstyle.xml
old mode 100644
new mode 100755
diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml
index 6a9e6461..a1a956e6 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -437,6 +437,7 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
+
</dependencies>
<build>
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index afb1f34a..92ce425a 100644
--- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -21,6 +21,7 @@ package org.apache.submarine.server;
import org.apache.log4j.PropertyConfigurator;
import org.apache.submarine.server.rest.provider.YamlEntityProvider;
import org.apache.submarine.server.workbench.websocket.NotebookServer;
+import org.apache.submarine.server.websocket.WebSocketServer;
import org.apache.submarine.commons.cluster.ClusterServer;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.Handler;
@@ -105,6 +106,9 @@ public class SubmarineServer extends ResourceConfig {
bindAsContract(NotebookServer.class)
.to(WebSocketServlet.class)
.in(Singleton.class);
+ bindAsContract(WebSocketServer.class)
+ .to(WebSocketServlet.class)
+ .in(Singleton.class);
}
});
@@ -117,13 +121,15 @@ public class SubmarineServer extends ResourceConfig {
// Cluster Server is useless for submarine now. Shield it to improve performance.
// setupClusterServer();
+ setupWebSocketServer(webApp, conf, sharedServiceLocator);
startServer();
+
}
@Inject
public SubmarineServer() {
packages("org.apache.submarine.server.workbench.rest",
- "org.apache.submarine.server.rest"
+ "org.apache.submarine.server.rest"
);
register(YamlEntityProvider.class);
}
@@ -170,7 +176,7 @@ public class SubmarineServer extends ResourceConfig {
}
private static WebAppContext setupWebAppContext(HandlerList handlers,
- SubmarineConfiguration conf) {
+ SubmarineConfiguration conf) {
WebAppContext webApp = new WebAppContext();
webApp.setContextPath("/");
File warPath = new File(conf.getString(SubmarineConfVars.ConfVars.WORKBENCH_WEB_WAR));
@@ -196,7 +202,7 @@ public class SubmarineServer extends ResourceConfig {
webApp.addServlet(new ServletHolder(RefreshServlet.class), "/user/*");
webApp.addServlet(new ServletHolder(RefreshServlet.class), "/workbench/*");
- handlers.setHandlers(new Handler[] { webApp });
+ handlers.setHandlers(new Handler[]{webApp});
return webApp;
}
@@ -223,9 +229,9 @@ public class SubmarineServer extends ResourceConfig {
httpsConfig.addCustomizer(src);
connector = new ServerConnector(
- server,
- new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()),
- new HttpConnectionFactory(httpsConfig));
+ server,
+ new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()),
+ new HttpConnectionFactory(httpsConfig));
} else {
connector = new ServerConnector(server);
}
@@ -246,14 +252,37 @@ public class SubmarineServer extends ResourceConfig {
}
private static void setupNotebookServer(WebAppContext webapp,
- SubmarineConfiguration conf, ServiceLocator serviceLocator) {
+ SubmarineConfiguration conf, ServiceLocator serviceLocator) {
String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize();
final ServletHolder servletHolder =
new ServletHolder(serviceLocator.getService(NotebookServer.class));
servletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- webapp.addServlet(servletHolder, "/ws/*");
+ webapp.addServlet(servletHolder, "/wss/*");
+ }
+
+ private static void setupWebSocketServer(WebAppContext webapp,
+ SubmarineConfiguration conf, ServiceLocator serviceLocator) {
+ String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize();
+ final ServletHolder notebookServletHolder =
+ new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+ notebookServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+ final ServletHolder experimentServletHolder =
+ new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+ experimentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+ final ServletHolder environmentServletHolder =
+ new ServletHolder(serviceLocator.getService(WebSocketServer.class));
+ environmentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize);
+
+
+
+ final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ webapp.addServlet(notebookServletHolder, "/ws/notebook/*");
+ webapp.addServlet(experimentServletHolder, "/ws/experiment/*");
+ webapp.addServlet(environmentServletHolder, "/ws/environment/*");
}
private static void setupClusterServer() {
@@ -331,7 +360,7 @@ public class SubmarineServer extends ResourceConfig {
StringBuilder sbIndexBuf = new StringBuilder();
try (InputStreamReader reader =
- new InputStreamReader(new FileInputStream(indexFile), "GBK");
+ new InputStreamReader(new FileInputStream(indexFile), "GBK");
BufferedReader bufferedReader = new BufferedReader(reader);) {
String lineTxt = null;
while ((lineTxt = bufferedReader.readLine()) != null) {
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java
new file mode 100644
index 00000000..cfe5be8d
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible to create the WebSockets for WebSocketServer.
+ */
+public class BasicWebSocketCreator implements WebSocketCreator {
+ private static final Logger LOG = LoggerFactory.getLogger(BasicWebSocketCreator.class);
+ private WebSocketServer webSocketServer;
+
+ public BasicWebSocketCreator(WebSocketServer webSocketServer) {
+ this.webSocketServer = webSocketServer;
+ }
+ public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
+ return new WebSocketHandler(request.getHttpServletRequest(), "", webSocketServer);
+ }
+
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java
new file mode 100644
index 00000000..dec72775
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.websocket;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.eclipse.jetty.websocket.api.WebSocketException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Manager class for managing websocket connections.
+ */
+public class ConnectionManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);
+ private static final Gson gson = new GsonBuilder()
+ .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+ .registerTypeAdapter(Date.class, new DateJsonDeserializer())
+ .setPrettyPrinting()
+ .create();
+
+ final Queue<WebSocketHandler> connectedSockets = new ConcurrentLinkedQueue<>();
+ // user -> connection
+ final Map<String, Queue<WebSocketHandler>> userSocketMap = new ConcurrentHashMap<>();
+
+ public void addConnection(WebSocketHandler conn) {
+ connectedSockets.add(conn);
+ }
+
+ public void removeConnection(WebSocketHandler conn) {
+ connectedSockets.remove(conn);
+ }
+
+ public void addUserConnection(String user, WebSocketHandler conn) {
+ LOG.info("Add user connection {} for user: {}", conn, user);
+ conn.setUser(user);
+ if (userSocketMap.containsKey(user)) {
+ userSocketMap.get(user).add(conn);
+ } else {
+ Queue<WebSocketHandler> socketQueue = new ConcurrentLinkedQueue<>();
+ socketQueue.add(conn);
+ userSocketMap.put(user, socketQueue);
+ }
+ }
+
+ public void removeUserConnection(String user, WebSocketHandler conn) {
+ LOG.info("Remove user connection {} for user: {}", conn, user);
+ if (userSocketMap.containsKey(user)) {
+ userSocketMap.get(user).remove(conn);
+ } else {
+ LOG.warn("Closing connection that is absent in user connections");
+ }
+ }
+
+ protected String serializeMessage(Message m) {
+ return gson.toJson(m);
+ }
+
+ public void broadcast(Message m) {
+ synchronized (connectedSockets) {
+ for (WebSocketHandler ns : connectedSockets) {
+ try {
+ ns.send(serializeMessage(m));
+ } catch (IOException | WebSocketException e) {
+ LOG.error("Send error: " + m, e);
+ }
+ }
+ }
+ }
+
+ public Set<String> getConnectedUsers() {
+ Set<String> connectedUsers = Sets.newHashSet();
+ for (WebSocketHandler notebookSocket : connectedSockets) {
+ connectedUsers.add(notebookSocket.getUser());
+ }
+ return connectedUsers;
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java
new file mode 100644
index 00000000..6fe30e07
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Type;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Locale;
+
+public class DateJsonDeserializer implements JsonDeserializer{
+ private final String[] DATE_FORMATS = new String[] {
+ "yyyy-MM-dd'T'HH:mm:ssZ",
+ "MMM d, yyyy h:mm:ss a",
+ "MMM dd, yyyy HH:mm:ss",
+ "yyyy-MM-dd HH:mm:ss.SSS"
+ };
+
+ @Override
+ public Date deserialize(JsonElement jsonElement, Type typeOF,
+ JsonDeserializationContext context) throws JsonParseException {
+ for (String format : DATE_FORMATS) {
+ try {
+ return new SimpleDateFormat(format, Locale.US).parse(jsonElement.getAsString());
+ } catch (ParseException e) {
+ throw new JsonParseException("Unparsable date: \"" + jsonElement.getAsString()
+ + "\". Supported formats: " + Arrays.toString(DATE_FORMATS));
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+ throw new RuntimeException("Unexpected Error in Deserialize Date");
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java
new file mode 100644
index 00000000..96de8a96
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Message {
+ /**
+ * Representation of event type.
+ */
+ public enum OP {
+ ERROR_INFO, // [s-c] error information to be sent
+ NOTICE // [s-c] Notice
+ }
+
+ private static final Gson gson = new Gson();
+ public static final Message EMPTY = new Message(null);
+
+ public OP op;
+ public Map<String, Object> data = new HashMap<>();
+
+ public Message(OP op) {
+ this.op = op;
+ }
+
+ public Message put(String k, Object v) {
+ data.put(k, v);
+ return this;
+ }
+
+ public Object get(String k) {
+ return data.get(k);
+ }
+
+ public <T> T getType(String key) {
+ return (T) data.get(key);
+ }
+
+ public <T> T getType(String key, Logger LOG) {
+ try {
+ return getType(key);
+ } catch (ClassCastException e) {
+ LOG.error("Failed to get " + key + " from message (Invalid type). " , e);
+ return null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Message{");
+ sb.append("data=").append(data);
+ sb.append(", op=").append(op);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ public static Message fromJson(String json) {
+ return gson.fromJson(json, Message.class);
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java
new file mode 100644
index 00000000..c8d2f0a8
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+
+/**
+ * Websocket handler.
+ */
+public class WebSocketHandler extends WebSocketAdapter {
+ private Session connection;
+ private WebSocketListener listener;
+ private HttpServletRequest request;
+ private String protocol;
+ private String user;
+
+ public WebSocketHandler(HttpServletRequest req, String protocol,
+ WebSocketListener listener) {
+ this.listener = listener;
+ this.request = req;
+ this.protocol = protocol;
+ this.user = StringUtils.EMPTY;
+ }
+
+ @Override
+ public void onWebSocketClose(int closeCode, String message) {
+ listener.onClose(this, closeCode, message);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session connection) {
+ this.connection = connection;
+ listener.onOpen(this);
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ listener.onMessage(this, message);
+ }
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public synchronized void send(String serializeMessage) throws IOException {
+ connection.getRemote().sendString(serializeMessage);
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ @Override
+ public String toString() {
+ return request.getRemoteHost() + ":" + request.getRemotePort();
+ }
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java
new file mode 100644
index 00000000..79e319f5
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+/**
+ * WebSocket listener.
+ */
+public interface WebSocketListener {
+ void onClose(WebSocketHandler socket, int code, String message);
+ void onOpen(WebSocketHandler socket);
+ void onMessage(WebSocketHandler socket, String message);
+}
diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java
new file mode 100644
index 00000000..2c1f536d
--- /dev/null
+++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.server.websocket;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.annotation.ManagedOperation;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Submarine websocket service. This class used setter injection because all servlet should have
+ * no-parameter constructor
+ */
+@ManagedObject
+public class WebSocketServer extends WebSocketServlet
+ implements org.apache.submarine.server.websocket.WebSocketListener {
+
+ /**
+ * Job manager service type.
+ */
+ protected enum JobManagerServiceType {
+ JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
+ private String serviceTypeKey;
+
+ JobManagerServiceType(String serviceType) {
+ this.serviceTypeKey = serviceType;
+ }
+
+ String getKey() {
+ return this.serviceTypeKey;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
+ private static Gson gson = new GsonBuilder()
+ .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+ .registerTypeAdapter(Date.class, new DateJsonDeserializer())
+ .setPrettyPrinting()
+ .create();
+
+ private static AtomicReference<WebSocketServer> self = new AtomicReference<>();
+
+ private ConnectionManager connectionManager;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ public WebSocketServer() {
+ this.connectionManager = new ConnectionManager();
+ WebSocketServer.self.set(this);
+ LOG.info("WebSocketServer instantiated: {}", this);
+ }
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.setCreator(new BasicWebSocketCreator(this));
+ }
+
+ @Override
+ public void onOpen(WebSocketHandler conn) {
+ LOG.info("New connection from {}", conn);
+ connectionManager.addConnection(conn);
+ }
+
+ @Override
+ public void onMessage(WebSocketHandler conn, String msg) {
+ try {
+ LOG.info("Got Message: " + msg);
+ if (StringUtils.isEmpty(conn.getUser())) {
+ connectionManager.addUserConnection("FakeUser1", conn);
+ }
+ } catch (Exception e) {
+ LOG.error("Can't handle message: " + msg, e);
+ try {
+ conn.send(serializeMessage(new Message(Message.OP.ERROR_INFO).put(
+ "info", e.getMessage())));
+ } catch (IOException iox) {
+ LOG.error("Fail to send error info", iox);
+ }
+ }
+ }
+
+ @Override
+ public void onClose(WebSocketHandler conn, int code, String reason) {
+ LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
+ connectionManager.removeConnection(conn);
+ connectionManager.removeUserConnection(conn.getUser(), conn);
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ protected Message deserializeMessage(String msg) {
+ return gson.fromJson(msg, Message.class);
+ }
+
+ protected String serializeMessage(Message m) {
+ return gson.toJson(m);
+ }
+
+ public void broadcast(Message m) {
+ connectionManager.broadcast(m);
+ }
+
+ @ManagedAttribute
+ public Set<String> getConnectedUsers() {
+ return connectionManager.getConnectedUsers();
+ }
+
+ @ManagedOperation
+ public void sendMessage(String message) {
+ Message m = new Message(Message.OP.NOTICE);
+ m.data.put("notice", message);
+ connectionManager.broadcast(m);
+ }
+}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
index 7cd4fe51..eab31af7 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
@@ -75,7 +75,10 @@ public abstract class AbstractSubmarineServerTest {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractSubmarineServerTest.class);
- static final String WEBSOCKET_API_URL = "/ws";
+ static final String WEBSOCKET_API_URL = "/wss";
+ static final String WEBSOCKET_NOTEBOOK_API_URL = "/ws/notebook";
+ static final String WEBSOCKET_EXPERIMENT_API_URL = "/ws/experiment";
+ static final String WEBSOCKET_ENVIRONMENT_API_URL = "/ws/environment";
static final String URL = getUrlToTest();
protected static final boolean WAS_RUNNING = checkIfServerIsRunning();
@@ -86,8 +89,20 @@ public abstract class AbstractSubmarineServerTest {
"/api/" + RestConstants.V1 + "/" + RestConstants.ENVIRONMENT;
protected static String ENV_NAME = "my-submarine-env";
- public static String getWebsocketApiUrlToTest() {
- String websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL;
+ public static String getWebsocketApiUrlToTest(String serverName) {
+
+ String websocketUrl = "ws://localhost:8080";
+ if (serverName.equals("wss")) {
+ websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL;
+ } else if (serverName.equals("notebook")) {
+ websocketUrl = "ws://localhost:8080" + WEBSOCKET_NOTEBOOK_API_URL;
+ }
+ else if (serverName.equals("environment")) {
+ websocketUrl = "ws://localhost:8080" + WEBSOCKET_ENVIRONMENT_API_URL;
+ }
+ else if (serverName.equals("experiment")) {
+ websocketUrl = "ws://localhost:8080" + WEBSOCKET_EXPERIMENT_API_URL;
+ }
if (System.getProperty("websocketUrl") != null) {
websocketUrl = System.getProperty("websocketurl");
}
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
index a18e0cdd..af8149a5 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
import org.apache.submarine.server.AbstractSubmarineServerTest;
import org.eclipse.jetty.websocket.api.Session;
@@ -24,16 +24,22 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.concurrent.Future;
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
+
+public class EnvironmentWebsocketTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EnvironmentWebsocketTest.class);
@BeforeClass
public static void init() throws Exception {
AbstractSubmarineServerTest.startUp(
- NotebookServerTest.class.getSimpleName());
+ EnvironmentWebsocketTest.class.getSimpleName());
}
@AfterClass
@@ -44,8 +50,9 @@ public class NotebookServerTest {
@Test
public void testWebsocketConnection() throws Exception{
URI uri = URI.create(
- AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+ AbstractSubmarineServerTest.getWebsocketApiUrlToTest("environment"));
WebSocketClient client = new WebSocketClient();
+
try {
client.start();
// The socket that receives events
@@ -70,21 +77,24 @@ public class NotebookServerTest {
public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);
- System.out.println("Socket Connected: " + sess);
+ LOG.info("Socket Connected: " + sess);
}
@Override
public void onWebSocketText(String message)
{
super.onWebSocketText(message);
- System.out.println("Received TEXT message: " + message);
+ LOG.info("Received TEXT message: " + message);
+ assertEquals(message, "Hello");
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
super.onWebSocketClose(statusCode, reason);
- System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+ LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+ assertEquals(statusCode, StatusCode.NORMAL);
+ assertEquals(reason, "I'm done");
}
@Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
index a18e0cdd..edd79099 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
-import org.apache.submarine.server.AbstractSubmarineServerTest;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -24,16 +23,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.concurrent.Future;
+import org.apache.submarine.server.AbstractSubmarineServerTest;
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
+public class ExperimentWebsocketTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExperimentWebsocketTest.class);
@BeforeClass
public static void init() throws Exception {
AbstractSubmarineServerTest.startUp(
- NotebookServerTest.class.getSimpleName());
+ ExperimentWebsocketTest.class.getSimpleName());
}
@AfterClass
@@ -44,8 +48,9 @@ public class NotebookServerTest {
@Test
public void testWebsocketConnection() throws Exception{
URI uri = URI.create(
- AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+ AbstractSubmarineServerTest.getWebsocketApiUrlToTest("experiment"));
WebSocketClient client = new WebSocketClient();
+
try {
client.start();
// The socket that receives events
@@ -70,21 +75,24 @@ public class NotebookServerTest {
public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);
- System.out.println("Socket Connected: " + sess);
+ LOG.info("Socket Connected: " + sess);
}
@Override
public void onWebSocketText(String message)
{
super.onWebSocketText(message);
- System.out.println("Received TEXT message: " + message);
+ LOG.info("Received TEXT message: " + message);
+ assertEquals(message, "Hello");
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
super.onWebSocketClose(statusCode, reason);
- System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+ LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+ assertEquals(statusCode, StatusCode.NORMAL);
+ assertEquals(reason, "I'm done");
}
@Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
similarity index 78%
copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
index a18e0cdd..d62e79ae 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.submarine.server.workbench.websocket;
+package org.apache.submarine.server.websocket;
import org.apache.submarine.server.AbstractSubmarineServerTest;
import org.eclipse.jetty.websocket.api.Session;
@@ -24,16 +24,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.concurrent.Future;
-public class NotebookServerTest {
+import static junit.framework.TestCase.assertEquals;
+public class NotebookWebsocketTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotebookWebsocketTest.class);
@BeforeClass
public static void init() throws Exception {
AbstractSubmarineServerTest.startUp(
- NotebookServerTest.class.getSimpleName());
+ NotebookWebsocketTest.class.getSimpleName());
}
@AfterClass
@@ -44,8 +49,9 @@ public class NotebookServerTest {
@Test
public void testWebsocketConnection() throws Exception{
URI uri = URI.create(
- AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+ AbstractSubmarineServerTest.getWebsocketApiUrlToTest("notebook"));
WebSocketClient client = new WebSocketClient();
+
try {
client.start();
// The socket that receives events
@@ -70,21 +76,24 @@ public class NotebookServerTest {
public void onWebSocketConnect(Session sess)
{
super.onWebSocketConnect(sess);
- System.out.println("Socket Connected: " + sess);
+ LOG.info("Socket Connected: " + sess);
}
@Override
public void onWebSocketText(String message)
{
super.onWebSocketText(message);
- System.out.println("Received TEXT message: " + message);
+ LOG.info("Received TEXT message: " + message);
+ assertEquals(message, "Hello");
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
super.onWebSocketClose(statusCode, reason);
- System.out.println("Socket Closed: [" + statusCode + "] " + reason);
+ LOG.info("Socket Closed: [" + statusCode + "] " + reason);
+ assertEquals(statusCode, StatusCode.NORMAL);
+ assertEquals(reason, "I'm done");
}
@Override
diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
index a18e0cdd..77aaa19d 100644
--- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
+++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java
@@ -44,7 +44,7 @@ public class NotebookServerTest {
@Test
public void testWebsocketConnection() throws Exception{
URI uri = URI.create(
- AbstractSubmarineServerTest.getWebsocketApiUrlToTest());
+ AbstractSubmarineServerTest.getWebsocketApiUrlToTest("wss"));
WebSocketClient client = new WebSocketClient();
try {
client.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org