You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/22 07:21:24 UTC
zeppelin git commit: ZEPPELIN-3735. Wrap WebSocket connection into
ConnectionManager
Repository: zeppelin
Updated Branches:
refs/heads/master 001c621c7 -> e10332c93
ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager
### What is this PR for?
This a refactoring PR which move all websocket connection related stuff into class `ConnectionManager`
### What type of PR is it?
[Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3735
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3151 from zjffdu/ZEPPELIN-3735 and squashes the following commits:
ca00ad803 [Jeff Zhang] ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e10332c9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e10332c9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e10332c9
Branch: refs/heads/master
Commit: e10332c9366a4c712d3c063cc1323590c463ee05
Parents: 001c621
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Aug 20 14:35:07 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Aug 22 15:21:20 2018 +0800
----------------------------------------------------------------------
.../zeppelin/socket/ConnectionManager.java | 442 ++++++++++++++++++
.../apache/zeppelin/socket/NotebookServer.java | 464 ++++---------------
.../zeppelin/socket/NotebookServerTest.java | 8 +-
3 files changed, 530 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
new file mode 100644
index 0000000..5d02d9f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.socket;
+
+
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.NotebookImportDeserializer;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.util.WatcherSecurityKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Manager class for managing websocket connections
+ */
+public class ConnectionManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
+ private static Gson gson = new GsonBuilder()
+ .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+ .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
+ .setPrettyPrinting()
+ .registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
+
+ final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
+ // noteId -> connection
+ final Map<String, List<NotebookSocket>> noteSocketMap = new ConcurrentHashMap<>();
+ // user -> connection
+ final Map<String, Queue<NotebookSocket>> userSocketMap = new ConcurrentHashMap<>();
+
+ /**
+ * This is a special endpoint in the notebook websoket, Every connection in this Queue
+ * will be able to watch every websocket event, it doesnt need to be listed into the map of
+ * noteSocketMap. This can be used to get information about websocket traffic and watch what
+ * is going on.
+ */
+ final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+
+ private HashSet<String> collaborativeModeList = new HashSet<>();
+ private Boolean collaborativeModeEnable = ZeppelinConfiguration
+ .create()
+ .isZeppelinNotebookCollaborativeModeEnable();
+
+
+ public void addConnection(NotebookSocket conn) {
+ connectedSockets.add(conn);
+ }
+
+ public void removeConnection(NotebookSocket conn) {
+ connectedSockets.remove(conn);
+ }
+
+ public void addNoteConnection(String noteId, NotebookSocket socket) {
+ LOGGER.debug("Add connection {} to note: {}", socket, noteId);
+ synchronized (noteSocketMap) {
+ // make sure a socket relates only an single note.
+ removeConnectionFromAllNote(socket);
+ List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+ if (socketList == null) {
+ socketList = new LinkedList<>();
+ noteSocketMap.put(noteId, socketList);
+ }
+ if (!socketList.contains(socket)) {
+ socketList.add(socket);
+ }
+ checkCollaborativeStatus(noteId, socketList);
+ }
+ }
+
+ public void removeNoteConnection(String noteId) {
+ synchronized (noteSocketMap) {
+ noteSocketMap.remove(noteId);
+ }
+ }
+
+ public void removeNoteConnection(String noteId, NotebookSocket socket) {
+ LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
+ synchronized (noteSocketMap) {
+ List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+ if (socketList != null) {
+ socketList.remove(socket);
+ }
+ checkCollaborativeStatus(noteId, socketList);
+ }
+ }
+
+ public void addUserConnection(String user, NotebookSocket conn) {
+ LOGGER.debug("Add user connection {} for user: {}", conn, user);
+ conn.setUser(user);
+ if (userSocketMap.containsKey(user)) {
+ userSocketMap.get(user).add(conn);
+ } else {
+ Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
+ socketQueue.add(conn);
+ userSocketMap.put(user, socketQueue);
+ }
+ }
+
+ public void removeUserConnection(String user, NotebookSocket conn) {
+ LOGGER.debug("Remove user connection {} for user: {}", conn, user);
+ if (userSocketMap.containsKey(user)) {
+ userSocketMap.get(user).remove(conn);
+ } else {
+ LOGGER.warn("Closing connection that is absent in user connections");
+ }
+ }
+
+ public String getAssociatedNoteId(NotebookSocket socket) {
+ String associatedNoteId = null;
+ synchronized (noteSocketMap) {
+ Set<String> noteIds = noteSocketMap.keySet();
+ for (String noteId : noteIds) {
+ List<NotebookSocket> sockets = noteSocketMap.get(noteId);
+ if (sockets.contains(socket)) {
+ associatedNoteId = noteId;
+ }
+ }
+ }
+
+ return associatedNoteId;
+ }
+
+ public void removeConnectionFromAllNote(NotebookSocket socket) {
+ synchronized (noteSocketMap) {
+ Set<String> noteIds = noteSocketMap.keySet();
+ for (String noteId : noteIds) {
+ removeConnectionFromNote(noteId, socket);
+ }
+ }
+ }
+
+ private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
+ LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
+ synchronized (noteSocketMap) {
+ List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+ if (socketList != null) {
+ socketList.remove(socket);
+ }
+ checkCollaborativeStatus(noteId, socketList);
+ }
+ }
+
+ private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) {
+ if (!collaborativeModeEnable) {
+ return;
+ }
+ boolean collaborativeStatusNew = socketList.size() > 1;
+ if (collaborativeStatusNew) {
+ collaborativeModeList.add(noteId);
+ } else {
+ collaborativeModeList.remove(noteId);
+ }
+
+ Message message = new Message(Message.OP.COLLABORATIVE_MODE_STATUS);
+ message.put("status", collaborativeStatusNew);
+ if (collaborativeStatusNew) {
+ HashSet<String> userList = new HashSet<>();
+ for (NotebookSocket noteSocket : socketList) {
+ userList.add(noteSocket.getUser());
+ }
+ message.put("users", userList);
+ }
+ broadcast(noteId, message);
+ }
+
+
+ protected String serializeMessage(Message m) {
+ return gson.toJson(m);
+ }
+
+ public void broadcast(Message m) {
+ synchronized (connectedSockets) {
+ for (NotebookSocket ns : connectedSockets) {
+ try {
+ ns.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOGGER.error("Send error: " + m, e);
+ }
+ }
+ }
+ }
+
+ public void broadcast(String noteId, Message m) {
+ List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
+ synchronized (noteSocketMap) {
+ broadcastToWatchers(noteId, StringUtils.EMPTY, m);
+ List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
+ if (socketLists == null || socketLists.size() == 0) {
+ return;
+ }
+ socketsToBroadcast = new ArrayList<>(socketLists);
+ }
+ LOGGER.debug("SEND >> " + m);
+ for (NotebookSocket conn : socketsToBroadcast) {
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOGGER.error("socket error", e);
+ }
+ }
+ }
+
+ private void broadcastToWatchers(String noteId, String subject, Message message) {
+ synchronized (watcherSockets) {
+ for (NotebookSocket watcher : watcherSockets) {
+ try {
+ watcher.send(
+ WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message))
+ .build().toJson());
+ } catch (IOException e) {
+ LOGGER.error("Cannot broadcast message to watcher", e);
+ }
+ }
+ }
+ }
+
+ public void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
+ List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
+ synchronized (noteSocketMap) {
+ broadcastToWatchers(noteId, StringUtils.EMPTY, m);
+ List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
+ if (socketLists == null || socketLists.size() == 0) {
+ return;
+ }
+ socketsToBroadcast = new ArrayList<>(socketLists);
+ }
+
+ LOGGER.debug("SEND >> " + m);
+ for (NotebookSocket conn : socketsToBroadcast) {
+ if (exclude.equals(conn)) {
+ continue;
+ }
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOGGER.error("socket error", e);
+ }
+ }
+ }
+
+ /**
+ * Send websocket message to all connections regardless of notebook id.
+ */
+ public void broadcastToAllConnections(String serialized) {
+ broadcastToAllConnectionsExcept(null, serialized);
+ }
+
+ public void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serializedMsg) {
+ synchronized (connectedSockets) {
+ for (NotebookSocket conn : connectedSockets) {
+ if (exclude != null && exclude.equals(conn)) {
+ continue;
+ }
+
+ try {
+ conn.send(serializedMsg);
+ } catch (IOException e) {
+ LOGGER.error("Cannot broadcast message to conn", e);
+ }
+ }
+ }
+ }
+
+ public Set<String> getConnectedUsers() {
+ Set<String> connectedUsers = Sets.newHashSet();
+ for (NotebookSocket notebookSocket : connectedSockets) {
+ connectedUsers.add(notebookSocket.getUser());
+ }
+ return connectedUsers;
+ }
+
+
+ public void multicastToUser(String user, Message m) {
+ if (!userSocketMap.containsKey(user)) {
+ LOGGER.warn("Multicasting to user {} that is not in connections map", user);
+ return;
+ }
+
+ for (NotebookSocket conn : userSocketMap.get(user)) {
+ unicast(m, conn);
+ }
+ }
+
+ public void unicast(Message m, NotebookSocket conn) {
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOGGER.error("socket error", e);
+ }
+ broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
+ }
+
+ public void unicastParagraph(Note note, Paragraph p, String user) {
+ if (!note.isPersonalizedMode() || p == null || user == null) {
+ return;
+ }
+
+ if (!userSocketMap.containsKey(user)) {
+ LOGGER.warn("Failed to send unicast. user {} that is not in connections map", user);
+ return;
+ }
+
+ for (NotebookSocket conn : userSocketMap.get(user)) {
+ Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p);
+ unicast(m, conn);
+ }
+ }
+
+ public void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
+ AuthenticationInfo subject) {
+ Set<String> userAndRoles;
+ NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
+ for (String user : userSocketMap.keySet()) {
+ if (subject.getUser().equals(user)) {
+ continue;
+ }
+ //reloaded already above; parameter - false
+ userAndRoles = authInfo.getRoles(user);
+ userAndRoles.add(user);
+ // TODO(zjffdu) is it ok for comment the following line ?
+ // notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles);
+ multicastToUser(user, new Message(Message.OP.NOTES_INFO).put("notes", notesInfo));
+ }
+ }
+
+ public void broadcastNote(Note note) {
+ broadcast(note.getId(), new Message(Message.OP.NOTE).put("note", note));
+ }
+
+ public void broadcastParagraph(Note note, Paragraph p) {
+ broadcastNoteForms(note);
+
+ if (note.isPersonalizedMode()) {
+ broadcastParagraphs(p.getUserParagraphMap(), p);
+ } else {
+ broadcast(note.getId(), new Message(Message.OP.PARAGRAPH).put("paragraph", p));
+ }
+ }
+
+ public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
+ Paragraph defaultParagraph) {
+ if (null != userParagraphMap) {
+ for (String user : userParagraphMap.keySet()) {
+ multicastToUser(user,
+ new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
+ }
+ }
+ }
+
+ private void broadcastNewParagraph(Note note, Paragraph para) {
+ LOGGER.info("Broadcasting paragraph on run call instead of note.");
+ int paraIndex = note.getParagraphs().indexOf(para);
+ broadcast(note.getId(),
+ new Message(Message.OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
+ }
+
+ // public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
+ // if (subject == null) {
+ // subject = new AuthenticationInfo(StringUtils.EMPTY);
+ // }
+ // //send first to requesting user
+ // List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
+ // multicastToUser(subject.getUser(), new Message(Message.OP.NOTES_INFO)
+ // .put("notes", notesInfo));
+ // //to others afterwards
+ // broadcastNoteListExcept(notesInfo, subject);
+ // }
+
+
+ private void broadcastNoteForms(Note note) {
+ GUI formsSettings = new GUI();
+ formsSettings.setForms(note.getNoteForms());
+ formsSettings.setParams(note.getNoteParams());
+ broadcast(note.getId(), new Message(Message.OP.SAVE_NOTE_FORMS)
+ .put("formsData", formsSettings));
+ }
+
+ public void switchConnectionToWatcher(NotebookSocket conn) {
+ if (!isSessionAllowedToSwitchToWatcher(conn)) {
+ LOGGER.error("Cannot switch this client to watcher, invalid security key");
+ return;
+ }
+ LOGGER.info("Going to add {} to watcher socket", conn);
+ // add the connection to the watcher.
+ if (watcherSockets.contains(conn)) {
+ LOGGER.info("connection alrerady present in the watcher");
+ return;
+ }
+ watcherSockets.add(conn);
+
+ // remove this connection from regular zeppelin ws usage.
+ removeConnection(conn);
+ removeConnectionFromAllNote(conn);
+ removeUserConnection(conn.getUser(), conn);
+ }
+
+ private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
+ String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
+ return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey
+ .equals(WatcherSecurityKey.getKey()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 16719f3..a3fce8f 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -17,8 +17,6 @@
package org.apache.zeppelin.socket;
import com.google.common.base.Strings;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -52,7 +50,6 @@ import org.apache.zeppelin.notebook.ParagraphWithRuntimeInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
-import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
@@ -64,7 +61,6 @@ import org.apache.zeppelin.service.SimpleServiceCallback;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch;
@@ -86,17 +82,13 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
@@ -130,7 +122,7 @@ public class NotebookServer extends WebSocketServlet
}
- private HashSet<String> collaborativeModeList = new HashSet<>();
+ // private HashSet<String> collaborativeModeList = new HashSet<>();
private Boolean collaborativeModeEnable = ZeppelinConfiguration
.create()
.isZeppelinNotebookCollaborativeModeEnable();
@@ -141,23 +133,17 @@ public class NotebookServer extends WebSocketServlet
.setPrettyPrinting()
.registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
- final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
- final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
- final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
-
+ private ConnectionManager connectionManager;
private NotebookService notebookService;
private ConfigurationService configurationService;
private JobManagerService jobManagerService;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
- /**
- * This is a special endpoint in the notebook websoket, Every connection in this Queue
- * will be able to watch every websocket event, it doesnt need to be listed into the map of
- * noteSocketMap. This can be used to get information about websocket traffic and watch what
- * is going on.
- */
- final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+
+ public NotebookServer() {
+ this.connectionManager = new ConnectionManager();
+ }
private Notebook notebook() {
return ZeppelinServer.notebook;
@@ -198,14 +184,10 @@ public class NotebookServer extends WebSocketServlet
return false;
}
- public NotebookSocket doWebSocketConnect(HttpServletRequest req, String protocol) {
- return new NotebookSocket(req, protocol, this);
- }
-
@Override
public void onOpen(NotebookSocket conn) {
LOG.info("New connection from {}", conn);
- connectedSockets.add(conn);
+ connectionManager.addConnection(conn);
}
@Override
@@ -258,7 +240,7 @@ public class NotebookServer extends WebSocketServlet
}
}
if (StringUtils.isEmpty(conn.getUser())) {
- addUserConnection(messagereceived.principal, conn);
+ connectionManager.addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject =
new AuthenticationInfo(messagereceived.principal, messagereceived.roles,
@@ -404,7 +386,7 @@ public class NotebookServer extends WebSocketServlet
getInterpreterSettings(conn);
break;
case WATCHER:
- switchConnectionToWatcher(conn, messagereceived);
+ connectionManager.switchConnectionToWatcher(conn);
break;
case SAVE_NOTE_FORMS:
saveNoteForms(conn, messagereceived);
@@ -426,30 +408,13 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onClose(NotebookSocket conn, int code, String reason) {
LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
- removeConnectionFromAllNote(conn);
- connectedSockets.remove(conn);
- removeUserConnection(conn.getUser(), conn);
- }
-
- private void removeUserConnection(String user, NotebookSocket conn) {
- LOG.debug("Remove user connection {} for user: {}", conn, user);
- if (userConnectedSockets.containsKey(user)) {
- userConnectedSockets.get(user).remove(conn);
- } else {
- LOG.warn("Closing connection that is absent in user connections");
- }
+ connectionManager.removeConnection(conn);
+ connectionManager.removeConnectionFromAllNote(conn);
+ connectionManager.removeUserConnection(conn.getUser(), conn);
}
- private void addUserConnection(String user, NotebookSocket conn) {
- LOG.debug("Add user connection {} for user: {}", conn, user);
- conn.setUser(user);
- if (userConnectedSockets.containsKey(user)) {
- userConnectedSockets.get(user).add(conn);
- } else {
- Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
- socketQueue.add(conn);
- userConnectedSockets.put(user, socketQueue);
- }
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
}
protected Message deserializeMessage(String msg) {
@@ -460,165 +425,13 @@ public class NotebookServer extends WebSocketServlet
return gson.toJson(m);
}
- private void addConnectionToNote(String noteId, NotebookSocket socket) {
- LOG.debug("Add connection {} to note: {}", socket, noteId);
- synchronized (noteSocketMap) {
- removeConnectionFromAllNote(socket); // make sure a socket relates only a
- // single note.
- List<NotebookSocket> socketList = noteSocketMap.get(noteId);
- if (socketList == null) {
- socketList = new LinkedList<>();
- noteSocketMap.put(noteId, socketList);
- }
- if (!socketList.contains(socket)) {
- socketList.add(socket);
- }
- checkCollaborativeStatus(noteId, socketList);
- }
- }
-
- private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
- LOG.debug("Remove connection {} from note: {}", socket, noteId);
- synchronized (noteSocketMap) {
- List<NotebookSocket> socketList = noteSocketMap.get(noteId);
- if (socketList != null) {
- socketList.remove(socket);
- }
- checkCollaborativeStatus(noteId, socketList);
- }
- }
-
- private void removeConn(String noteId) {
- synchronized (noteSocketMap) {
- List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
- }
- }
-
- private void removeConnectionFromAllNote(NotebookSocket socket) {
- synchronized (noteSocketMap) {
- Set<String> keys = noteSocketMap.keySet();
- for (String noteId : keys) {
- removeConnectionFromNote(noteId, socket);
- }
- }
- }
-
- private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) {
- if (!collaborativeModeEnable) {
- return;
- }
- boolean collaborativeStatusNew = socketList.size() > 1;
- if (collaborativeStatusNew) {
- collaborativeModeList.add(noteId);
- } else {
- collaborativeModeList.remove(noteId);
- }
-
- Message message = new Message(OP.COLLABORATIVE_MODE_STATUS);
- message.put("status", collaborativeStatusNew);
- if (collaborativeStatusNew) {
- HashSet<String> userList = new HashSet<>();
- for (NotebookSocket noteSocket : socketList) {
- userList.add(noteSocket.getUser());
- }
- message.put("users", userList);
- }
- broadcast(noteId, message);
- }
-
- private String getOpenNoteId(NotebookSocket socket) {
- String id = null;
- synchronized (noteSocketMap) {
- Set<String> keys = noteSocketMap.keySet();
- for (String noteId : keys) {
- List<NotebookSocket> sockets = noteSocketMap.get(noteId);
- if (sockets.contains(socket)) {
- id = noteId;
- }
- }
- }
-
- return id;
- }
-
public void broadcast(Message m) {
- synchronized (connectedSockets) {
- for (NotebookSocket ns : connectedSockets) {
- try {
- ns.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("Send error: " + m, e);
- }
- }
- }
- }
-
- private void broadcast(String noteId, Message m) {
- List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
- synchronized (noteSocketMap) {
- broadcastToWatchers(noteId, StringUtils.EMPTY, m);
- List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
- if (socketLists == null || socketLists.size() == 0) {
- return;
- }
- socketsToBroadcast = new ArrayList<>(socketLists);
- }
- LOG.debug("SEND >> " + m);
- for (NotebookSocket conn : socketsToBroadcast) {
- try {
- conn.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("socket error", e);
- }
- }
- }
-
- private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
- List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
- synchronized (noteSocketMap) {
- broadcastToWatchers(noteId, StringUtils.EMPTY, m);
- List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
- if (socketLists == null || socketLists.size() == 0) {
- return;
- }
- socketsToBroadcast = new ArrayList<>(socketLists);
- }
-
- LOG.debug("SEND >> " + m);
- for (NotebookSocket conn : socketsToBroadcast) {
- if (exclude.equals(conn)) {
- continue;
- }
- try {
- conn.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("socket error", e);
- }
- }
- }
-
- private void multicastToUser(String user, Message m) {
- if (!userConnectedSockets.containsKey(user)) {
- LOG.warn("Multicasting to user {} that is not in connections map", user);
- return;
- }
-
- for (NotebookSocket conn : userConnectedSockets.get(user)) {
- unicast(m, conn);
- }
- }
-
- private void unicast(Message m, NotebookSocket conn) {
- try {
- conn.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("socket error", e);
- }
- broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
+ connectionManager.broadcast(m);
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
- addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+
+ connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage),
new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) {
@Override
@@ -648,7 +461,7 @@ public class NotebookServer extends WebSocketServlet
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notesJobInfo);
- broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
}
@@ -660,7 +473,7 @@ public class NotebookServer extends WebSocketServlet
}
public void unsubscribeNoteJobInfo(NotebookSocket conn) {
- removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+ connectionManager.removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
}
public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -707,23 +520,7 @@ public class NotebookServer extends WebSocketServlet
}
public void broadcastNote(Note note) {
- broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
- }
-
- public void unicastParagraph(Note note, Paragraph p, String user) {
- if (!note.isPersonalizedMode() || p == null || user == null) {
- return;
- }
-
- if (!userConnectedSockets.containsKey(user)) {
- LOG.warn("Failed to send unicast. user {} that is not in connections map", user);
- return;
- }
-
- for (NotebookSocket conn : userConnectedSockets.get(user)) {
- Message m = new Message(OP.PARAGRAPH).put("paragraph", p);
- unicast(m, conn);
- }
+ connectionManager.broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
}
public void broadcastParagraph(Note note, Paragraph p) {
@@ -732,8 +529,8 @@ public class NotebookServer extends WebSocketServlet
if (note.isPersonalizedMode()) {
broadcastParagraphs(p.getUserParagraphMap(), p);
} else {
- broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph",
- new ParagraphWithRuntimeInfo(p)));
+ connectionManager.broadcast(note.getId(),
+ new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p)));
}
}
@@ -741,7 +538,7 @@ public class NotebookServer extends WebSocketServlet
Paragraph defaultParagraph) {
if (null != userParagraphMap) {
for (String user : userParagraphMap.keySet()) {
- multicastToUser(user,
+ connectionManager.multicastToUser(user,
new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
}
}
@@ -750,7 +547,7 @@ public class NotebookServer extends WebSocketServlet
private void broadcastNewParagraph(Note note, Paragraph para) {
LOG.info("Broadcasting paragraph on run call instead of note.");
int paraIndex = note.getParagraphs().indexOf(para);
- broadcast(note.getId(),
+ connectionManager.broadcast(note.getId(),
new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
}
@@ -760,9 +557,10 @@ public class NotebookServer extends WebSocketServlet
}
//send first to requesting user
List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
- multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
+ connectionManager.multicastToUser(subject.getUser(),
+ new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
- broadcastNoteListExcept(notesInfo, subject);
+ connectionManager.broadcastNoteListExcept(notesInfo, subject);
}
public void listNotes(NotebookSocket conn, Message message) throws IOException {
@@ -772,7 +570,7 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(List<Map<String, String>> notesInfo,
ServiceContext context) throws IOException {
super.onSuccess(notesInfo, context);
- unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+ connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
}
});
}
@@ -785,30 +583,14 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(List<Map<String, String>> notesInfo,
ServiceContext context) throws IOException {
super.onSuccess(notesInfo, context);
- multicastToUser(context.getAutheInfo().getUser(),
+ connectionManager.multicastToUser(context.getAutheInfo().getUser(),
new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
- broadcastNoteListExcept(notesInfo, context.getAutheInfo());
+ connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo());
}
});
}
- private void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
- AuthenticationInfo subject) {
- Set<String> userAndRoles;
- NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
- for (String user : userConnectedSockets.keySet()) {
- if (subject.getUser().equals(user)) {
- continue;
- }
- //reloaded already above; parameter - false
- userAndRoles = authInfo.getRoles(user);
- userAndRoles.add(user);
- notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles);
- multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
- }
- }
-
void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles,
Set<String> allowed) throws IOException {
LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed);
@@ -821,23 +603,6 @@ public class NotebookServer extends WebSocketServlet
/**
- * @return false if user doesn't have runner permission for this paragraph
- */
- private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook,
- String noteId, HashSet<String> userAndRoles,
- String principal, String op)
- throws IOException {
- NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
- if (!notebookAuthorization.isRunner(noteId, userAndRoles)) {
- permissionError(conn, op, principal, userAndRoles,
- notebookAuthorization.getOwners(noteId));
- return false;
- }
-
- return true;
- }
-
- /**
* @return false if user doesn't have writer permission for this paragraph
*/
private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook,
@@ -880,7 +645,7 @@ public class NotebookServer extends WebSocketServlet
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
- addConnectionToNote(note.getId(), conn);
+ connectionManager.addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
}
@@ -896,11 +661,11 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(Note note, ServiceContext context) throws IOException {
super.onSuccess(note, context);
if (note != null) {
- addConnectionToNote(note.getId(), conn);
+ connectionManager.addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
} else {
- removeConnectionFromAllNote(conn);
+ connectionManager.removeConnectionFromAllNote(conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
}
}
@@ -923,7 +688,7 @@ public class NotebookServer extends WebSocketServlet
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
- broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+ connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
.put("config", config)
.put("info", note.getInfo()));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
@@ -1021,7 +786,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
super.onSuccess(note, context);
- addConnectionToNote(note.getId(), conn);
+ connectionManager.addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
@@ -1043,7 +808,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(String message, ServiceContext context) throws IOException {
super.onSuccess(message, context);
- removeConn(noteId);
+ connectionManager.removeNoteConnection(noteId);
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
});
@@ -1069,7 +834,7 @@ public class NotebookServer extends WebSocketServlet
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
for (Note note : notes) {
notebook.removeNote(note.getId(), subject);
- removeConn(note.getId());
+ connectionManager.removeNoteConnection(note.getId());
}
broadcastNoteList(subject, userAndRoles);
}
@@ -1186,7 +951,7 @@ public class NotebookServer extends WebSocketServlet
private void updateParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
if (noteId == null) {
noteId = (String) fromMessage.get("noteId");
}
@@ -1222,7 +987,7 @@ public class NotebookServer extends WebSocketServlet
return;
}
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
if (noteId == null) {
noteId = fromMessage.getType("noteId", LOG);
if (noteId == null) {
@@ -1265,19 +1030,19 @@ public class NotebookServer extends WebSocketServlet
p.setText(paragraphText);
Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText)
.put("paragraphId", p.getId());
- broadcastExcept(note.getId(), message, conn);
+ connectionManager.broadcastExcept(note.getId(), message, conn);
}
private void cloneNote(NotebookSocket conn,
Message fromMessage) throws IOException {
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
String name = (String) fromMessage.get("name");
getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note newNote, ServiceContext context) throws IOException {
super.onSuccess(newNote, context);
- addConnectionToNote(newNote.getId(), conn);
+ connectionManager.addNoteConnection(newNote.getId(), conn);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
@@ -1321,13 +1086,13 @@ public class NotebookServer extends WebSocketServlet
private void removeParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
getNotebookService().removeParagraph(noteId, paragraphId,
getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){
@Override
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
- broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+ connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
put("id", p.getId()));
}
});
@@ -1336,14 +1101,14 @@ public class NotebookServer extends WebSocketServlet
private void clearParagraphOutput(NotebookSocket conn,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
if (p.getNote().isPersonalizedMode()) {
- unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+ connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
} else {
broadcastParagraph(p.getNote(), p);
}
@@ -1353,7 +1118,7 @@ public class NotebookServer extends WebSocketServlet
private void completion(NotebookSocket conn,
Message fromMessage) throws IOException {
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
String paragraphId = (String) fromMessage.get("id");
String buffer = (String) fromMessage.get("buf");
int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
@@ -1452,7 +1217,7 @@ public class NotebookServer extends WebSocketServlet
.getId())) {
AngularObjectRegistry angularObjectRegistry =
setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
- this.broadcastExcept(n.getId(),
+ connectionManager.broadcastExcept(n.getId(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
.put("paragraphId", ao.getParagraphId()), conn);
@@ -1460,7 +1225,7 @@ public class NotebookServer extends WebSocketServlet
}
}
} else { // broadcast to all web session for the note
- this.broadcastExcept(note.getId(),
+ connectionManager.broadcastExcept(note.getId(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId())
.put("paragraphId", ao.getParagraphId()), conn);
@@ -1551,7 +1316,8 @@ public class NotebookServer extends WebSocketServlet
final AngularObject ao =
remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
- this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
+ connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
}
@@ -1562,7 +1328,8 @@ public class NotebookServer extends WebSocketServlet
NotebookSocket conn) {
final AngularObject ao =
remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
- this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao)
+ connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE)
+ .put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
}
@@ -1578,7 +1345,7 @@ public class NotebookServer extends WebSocketServlet
angularObject.set(varValue, true);
}
- this.broadcastExcept(noteId,
+ connectionManager.broadcastExcept(noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
@@ -1589,7 +1356,7 @@ public class NotebookServer extends WebSocketServlet
String interpreterGroupId, NotebookSocket conn) {
final AngularObject removed = registry.remove(varName, noteId, paragraphId);
if (removed != null) {
- this.broadcastExcept(noteId,
+ connectionManager.broadcastExcept(noteId,
new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
@@ -1600,14 +1367,14 @@ public class NotebookServer extends WebSocketServlet
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
getServiceContext(fromMessage),
new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
super.onSuccess(result, context);
- broadcast(result.getNote().getId(),
+ connectionManager.broadcast(result.getNote().getId(),
new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
}
});
@@ -1616,7 +1383,7 @@ public class NotebookServer extends WebSocketServlet
private String insertParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
Map<String, Object> config;
if (fromMessage.get("config") != null) {
config = (Map<String, Object>) fromMessage.get("config");
@@ -1651,7 +1418,7 @@ public class NotebookServer extends WebSocketServlet
private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
new WebSocketServiceCallback<>(conn));
}
@@ -1675,7 +1442,7 @@ public class NotebookServer extends WebSocketServlet
return;
}
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
if (!hasParagraphWriterPermission(conn, notebook, noteId,
userAndRoles, fromMessage.principal, "write")) {
@@ -1718,14 +1485,14 @@ public class NotebookServer extends WebSocketServlet
}
// broadcast to other clients only
- broadcastExcept(note.getId(),
+ connectionManager.broadcastExcept(note.getId(),
new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
}
private void runParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
String text = (String) fromMessage.get("paragraph");
String title = (String) fromMessage.get("title");
Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
@@ -1739,7 +1506,7 @@ public class NotebookServer extends WebSocketServlet
if (p.getNote().isPersonalizedMode()) {
Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
context.getAutheInfo().getUser());
- unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+ connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
}
// if it's the last paragraph and not empty, let's add a new one
@@ -1922,7 +1689,7 @@ public class NotebookServer extends WebSocketServlet
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId)
.put("paragraphId", paragraphId).put("index", index).put("data", output);
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
/**
@@ -1939,10 +1706,10 @@ public class NotebookServer extends WebSocketServlet
if (note.isPersonalizedMode()) {
String user = note.getParagraph(paragraphId).getUser();
if (null != user) {
- multicastToUser(user, msg);
+ connectionManager.multicastToUser(user, msg);
}
} else {
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
}
@@ -1967,7 +1734,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("appId", appId).put("data", output);
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
/**
@@ -1979,14 +1746,14 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("type", type).put("appId", appId).put("data", output);
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
@Override
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId)
.put("appId", appId).put("pkg", pkg);
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
@Override
@@ -1994,7 +1761,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId)
.put("appId", appId).put("status", status);
- broadcast(noteId, msg);
+ connectionManager.broadcast(noteId, msg);
}
@@ -2086,6 +1853,7 @@ public class NotebookServer extends WebSocketServlet
} catch (IOException e) {
LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
}
+
}
@Override
@@ -2127,7 +1895,7 @@ public class NotebookServer extends WebSocketServlet
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notesJobInfo);
- broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
}
}
@@ -2135,7 +1903,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onProgressUpdate(Paragraph p, int progress) {
- broadcast(p.getNote().getId(),
+ connectionManager.broadcast(p.getNote().getId(),
new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress));
}
@@ -2184,7 +1952,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId()).put("data", output);
- broadcast(paragraph.getNote().getId(), msg);
+ connectionManager.broadcast(paragraph.getNote().getId(), msg);
}
/**
@@ -2195,7 +1963,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId()).put("data", result.getData());
- broadcast(paragraph.getNote().getId(), msg);
+ connectionManager.broadcast(paragraph.getNote().getId(), msg);
}
@Override
@@ -2257,7 +2025,8 @@ public class NotebookServer extends WebSocketServlet
continue;
}
- broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
+ connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId())
.put("paragraphId", object.getParagraphId()));
}
@@ -2276,7 +2045,7 @@ public class NotebookServer extends WebSocketServlet
notebook.getInterpreterSettingManager().getSettingIds();
for (String id : settingIds) {
if (interpreterGroupId.contains(id)) {
- broadcast(note.getId(),
+ connectionManager.broadcast(note.getId(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId)
.put("paragraphId", paragraphId));
break;
@@ -2288,7 +2057,7 @@ public class NotebookServer extends WebSocketServlet
private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("paragraphId");
String replName = (String) fromMessage.get("magic");
- String noteId = getOpenNoteId(conn);
+ String noteId = connectionManager.getAssociatedNoteId(conn);
getNotebookService().getEditorSetting(noteId, replName,
getServiceContext(fromMessage),
@@ -2317,68 +2086,6 @@ public class NotebookServer extends WebSocketServlet
new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings)));
}
- private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived) {
- if (!isSessionAllowedToSwitchToWatcher(conn)) {
- LOG.error("Cannot switch this client to watcher, invalid security key");
- return;
- }
- LOG.info("Going to add {} to watcher socket", conn);
- // add the connection to the watcher.
- if (watcherSockets.contains(conn)) {
- LOG.info("connection alrerady present in the watcher");
- return;
- }
- watcherSockets.add(conn);
-
- // remove this connection from regular zeppelin ws usage.
- removeConnectionFromAllNote(conn);
- connectedSockets.remove(conn);
- removeUserConnection(conn.getUser(), conn);
- }
-
- private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
- String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
- return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey
- .equals(WatcherSecurityKey.getKey()));
- }
-
- /**
- * Send websocket message to all connections regardless of notebook id.
- */
- private void broadcastToAllConnections(String serialized) {
- broadcastToAllConnectionsExcept(null, serialized);
- }
-
- private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
- synchronized (connectedSockets) {
- for (NotebookSocket conn : connectedSockets) {
- if (exclude != null && exclude.equals(conn)) {
- continue;
- }
-
- try {
- conn.send(serialized);
- } catch (IOException e) {
- LOG.error("Cannot broadcast message to watcher", e);
- }
- }
- }
- }
-
- private void broadcastToWatchers(String noteId, String subject, Message message) {
- synchronized (watcherSockets) {
- for (NotebookSocket watcher : watcherSockets) {
- try {
- watcher.send(
- WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message))
- .build().toJson());
- } catch (IOException e) {
- LOG.error("Cannot broadcast message to watcher", e);
- }
- }
- }
- }
-
@Override
public void onParaInfosReceived(String noteId, String paragraphId,
String interpreterSettingId, Map<String, String> metaInfos) {
@@ -2397,7 +2104,7 @@ public class NotebookServer extends WebSocketServlet
paragraph
.updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId());
- broadcast(
+ connectionManager.broadcast(
note.getId(),
new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos",
paragraph.getRuntimeInfos()));
@@ -2409,7 +2116,8 @@ public class NotebookServer extends WebSocketServlet
GUI formsSettings = new GUI();
formsSettings.setForms(note.getNoteForms());
formsSettings.setParams(note.getNoteParams());
- broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
+ connectionManager.broadcast(note.getId(),
+ new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
}
private void saveNoteForms(NotebookSocket conn,
@@ -2442,18 +2150,14 @@ public class NotebookServer extends WebSocketServlet
@Override
public Set<String> getConnectedUsers() {
- Set<String> connectionList = Sets.newHashSet();
- for (NotebookSocket notebookSocket : connectedSockets) {
- connectionList.add(notebookSocket.getUser());
- }
- return connectionList;
+ return connectionManager.getConnectedUsers();
}
@Override
public void sendMessage(String message) {
Message m = new Message(OP.NOTICE);
m.data.put("notice", message);
- broadcast(m);
+ connectionManager.broadcast(m);
}
private ServiceContext getServiceContext(Message message) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index abce8b6..03e7ee6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -300,7 +300,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
- server.noteSocketMap.put("noteId", asList(conn, otherConn));
+ server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -349,7 +349,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
- server.noteSocketMap.put("noteId", asList(conn, otherConn));
+ server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -393,7 +393,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
- server.noteSocketMap.put("noteId", asList(conn, otherConn));
+ server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -440,7 +440,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("interpreterGroupId", "mdGroup")
.put("noteId", "noteId")
.put("paragraphId", "paragraphId"));
- server.noteSocketMap.put("noteId", asList(conn, otherConn));
+ server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
// When
server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived);