You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/22 07:21:24 UTC

zeppelin git commit: ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager

Repository: zeppelin
Updated Branches:
  refs/heads/master 001c621c7 -> e10332c93


ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager

### What is this PR for?
This a refactoring PR which move all websocket connection related stuff into class `ConnectionManager`

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3735

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #3151 from zjffdu/ZEPPELIN-3735 and squashes the following commits:

ca00ad803 [Jeff Zhang] ZEPPELIN-3735. Wrap WebSocket connection into ConnectionManager


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e10332c9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e10332c9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e10332c9

Branch: refs/heads/master
Commit: e10332c9366a4c712d3c063cc1323590c463ee05
Parents: 001c621
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Aug 20 14:35:07 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Aug 22 15:21:20 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/socket/ConnectionManager.java      | 442 ++++++++++++++++++
 .../apache/zeppelin/socket/NotebookServer.java  | 464 ++++---------------
 .../zeppelin/socket/NotebookServerTest.java     |   8 +-
 3 files changed, 530 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
new file mode 100644
index 0000000..5d02d9f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.socket;
+
+
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.NotebookImportDeserializer;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.util.WatcherSecurityKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Manager class for managing websocket connections
+ */
+public class ConnectionManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionManager.class);
+  private static Gson gson = new GsonBuilder()
+      .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+      .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
+      .setPrettyPrinting()
+      .registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
+
+  final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
+  // noteId -> connection
+  final Map<String, List<NotebookSocket>> noteSocketMap = new ConcurrentHashMap<>();
+  // user -> connection
+  final Map<String, Queue<NotebookSocket>> userSocketMap = new ConcurrentHashMap<>();
+
+  /**
+   * This is a special endpoint in the notebook websoket, Every connection in this Queue
+   * will be able to watch every websocket event, it doesnt need to be listed into the map of
+   * noteSocketMap. This can be used to get information about websocket traffic and watch what
+   * is going on.
+   */
+  final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+
+  private HashSet<String> collaborativeModeList = new HashSet<>();
+  private Boolean collaborativeModeEnable = ZeppelinConfiguration
+      .create()
+      .isZeppelinNotebookCollaborativeModeEnable();
+
+
+  public void addConnection(NotebookSocket conn) {
+    connectedSockets.add(conn);
+  }
+
+  public void removeConnection(NotebookSocket conn) {
+    connectedSockets.remove(conn);
+  }
+
+  public void addNoteConnection(String noteId, NotebookSocket socket) {
+    LOGGER.debug("Add connection {} to note: {}", socket, noteId);
+    synchronized (noteSocketMap) {
+      // make sure a socket relates only an single note.
+      removeConnectionFromAllNote(socket);
+      List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+      if (socketList == null) {
+        socketList = new LinkedList<>();
+        noteSocketMap.put(noteId, socketList);
+      }
+      if (!socketList.contains(socket)) {
+        socketList.add(socket);
+      }
+      checkCollaborativeStatus(noteId, socketList);
+    }
+  }
+
+  public void removeNoteConnection(String noteId) {
+    synchronized (noteSocketMap) {
+      noteSocketMap.remove(noteId);
+    }
+  }
+
+  public void removeNoteConnection(String noteId, NotebookSocket socket) {
+    LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
+    synchronized (noteSocketMap) {
+      List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+      if (socketList != null) {
+        socketList.remove(socket);
+      }
+      checkCollaborativeStatus(noteId, socketList);
+    }
+  }
+
+  public void addUserConnection(String user, NotebookSocket conn) {
+    LOGGER.debug("Add user connection {} for user: {}", conn, user);
+    conn.setUser(user);
+    if (userSocketMap.containsKey(user)) {
+      userSocketMap.get(user).add(conn);
+    } else {
+      Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
+      socketQueue.add(conn);
+      userSocketMap.put(user, socketQueue);
+    }
+  }
+
+  public void removeUserConnection(String user, NotebookSocket conn) {
+    LOGGER.debug("Remove user connection {} for user: {}", conn, user);
+    if (userSocketMap.containsKey(user)) {
+      userSocketMap.get(user).remove(conn);
+    } else {
+      LOGGER.warn("Closing connection that is absent in user connections");
+    }
+  }
+
+  public String getAssociatedNoteId(NotebookSocket socket) {
+    String associatedNoteId = null;
+    synchronized (noteSocketMap) {
+      Set<String> noteIds = noteSocketMap.keySet();
+      for (String noteId : noteIds) {
+        List<NotebookSocket> sockets = noteSocketMap.get(noteId);
+        if (sockets.contains(socket)) {
+          associatedNoteId = noteId;
+        }
+      }
+    }
+
+    return associatedNoteId;
+  }
+
+  public void removeConnectionFromAllNote(NotebookSocket socket) {
+    synchronized (noteSocketMap) {
+      Set<String> noteIds = noteSocketMap.keySet();
+      for (String noteId : noteIds) {
+        removeConnectionFromNote(noteId, socket);
+      }
+    }
+  }
+
+  private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
+    LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
+    synchronized (noteSocketMap) {
+      List<NotebookSocket> socketList = noteSocketMap.get(noteId);
+      if (socketList != null) {
+        socketList.remove(socket);
+      }
+      checkCollaborativeStatus(noteId, socketList);
+    }
+  }
+
+  private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) {
+    if (!collaborativeModeEnable) {
+      return;
+    }
+    boolean collaborativeStatusNew = socketList.size() > 1;
+    if (collaborativeStatusNew) {
+      collaborativeModeList.add(noteId);
+    } else {
+      collaborativeModeList.remove(noteId);
+    }
+
+    Message message = new Message(Message.OP.COLLABORATIVE_MODE_STATUS);
+    message.put("status", collaborativeStatusNew);
+    if (collaborativeStatusNew) {
+      HashSet<String> userList = new HashSet<>();
+      for (NotebookSocket noteSocket : socketList) {
+        userList.add(noteSocket.getUser());
+      }
+      message.put("users", userList);
+    }
+    broadcast(noteId, message);
+  }
+
+
+  protected String serializeMessage(Message m) {
+    return gson.toJson(m);
+  }
+
+  public void broadcast(Message m) {
+    synchronized (connectedSockets) {
+      for (NotebookSocket ns : connectedSockets) {
+        try {
+          ns.send(serializeMessage(m));
+        } catch (IOException e) {
+          LOGGER.error("Send error: " + m, e);
+        }
+      }
+    }
+  }
+
+  public void broadcast(String noteId, Message m) {
+    List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
+    synchronized (noteSocketMap) {
+      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
+      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
+      if (socketLists == null || socketLists.size() == 0) {
+        return;
+      }
+      socketsToBroadcast = new ArrayList<>(socketLists);
+    }
+    LOGGER.debug("SEND >> " + m);
+    for (NotebookSocket conn : socketsToBroadcast) {
+      try {
+        conn.send(serializeMessage(m));
+      } catch (IOException e) {
+        LOGGER.error("socket error", e);
+      }
+    }
+  }
+
+  private void broadcastToWatchers(String noteId, String subject, Message message) {
+    synchronized (watcherSockets) {
+      for (NotebookSocket watcher : watcherSockets) {
+        try {
+          watcher.send(
+              WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message))
+                  .build().toJson());
+        } catch (IOException e) {
+          LOGGER.error("Cannot broadcast message to watcher", e);
+        }
+      }
+    }
+  }
+
+  public void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
+    List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
+    synchronized (noteSocketMap) {
+      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
+      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
+      if (socketLists == null || socketLists.size() == 0) {
+        return;
+      }
+      socketsToBroadcast = new ArrayList<>(socketLists);
+    }
+
+    LOGGER.debug("SEND >> " + m);
+    for (NotebookSocket conn : socketsToBroadcast) {
+      if (exclude.equals(conn)) {
+        continue;
+      }
+      try {
+        conn.send(serializeMessage(m));
+      } catch (IOException e) {
+        LOGGER.error("socket error", e);
+      }
+    }
+  }
+
+  /**
+   * Send websocket message to all connections regardless of notebook id.
+   */
+  public void broadcastToAllConnections(String serialized) {
+    broadcastToAllConnectionsExcept(null, serialized);
+  }
+
+  public void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serializedMsg) {
+    synchronized (connectedSockets) {
+      for (NotebookSocket conn : connectedSockets) {
+        if (exclude != null && exclude.equals(conn)) {
+          continue;
+        }
+
+        try {
+          conn.send(serializedMsg);
+        } catch (IOException e) {
+          LOGGER.error("Cannot broadcast message to conn", e);
+        }
+      }
+    }
+  }
+
+  public Set<String> getConnectedUsers() {
+    Set<String> connectedUsers = Sets.newHashSet();
+    for (NotebookSocket notebookSocket : connectedSockets) {
+      connectedUsers.add(notebookSocket.getUser());
+    }
+    return connectedUsers;
+  }
+
+
+  public void multicastToUser(String user, Message m) {
+    if (!userSocketMap.containsKey(user)) {
+      LOGGER.warn("Multicasting to user {} that is not in connections map", user);
+      return;
+    }
+
+    for (NotebookSocket conn : userSocketMap.get(user)) {
+      unicast(m, conn);
+    }
+  }
+
+  public void unicast(Message m, NotebookSocket conn) {
+    try {
+      conn.send(serializeMessage(m));
+    } catch (IOException e) {
+      LOGGER.error("socket error", e);
+    }
+    broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
+  }
+
+  public void unicastParagraph(Note note, Paragraph p, String user) {
+    if (!note.isPersonalizedMode() || p == null || user == null) {
+      return;
+    }
+
+    if (!userSocketMap.containsKey(user)) {
+      LOGGER.warn("Failed to send unicast. user {} that is not in connections map", user);
+      return;
+    }
+
+    for (NotebookSocket conn : userSocketMap.get(user)) {
+      Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p);
+      unicast(m, conn);
+    }
+  }
+
+  public void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
+                                      AuthenticationInfo subject) {
+    Set<String> userAndRoles;
+    NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
+    for (String user : userSocketMap.keySet()) {
+      if (subject.getUser().equals(user)) {
+        continue;
+      }
+      //reloaded already above; parameter - false
+      userAndRoles = authInfo.getRoles(user);
+      userAndRoles.add(user);
+      // TODO(zjffdu) is it ok for comment the following line ?
+      // notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles);
+      multicastToUser(user, new Message(Message.OP.NOTES_INFO).put("notes", notesInfo));
+    }
+  }
+
+  public void broadcastNote(Note note) {
+    broadcast(note.getId(), new Message(Message.OP.NOTE).put("note", note));
+  }
+
+  public void broadcastParagraph(Note note, Paragraph p) {
+    broadcastNoteForms(note);
+
+    if (note.isPersonalizedMode()) {
+      broadcastParagraphs(p.getUserParagraphMap(), p);
+    } else {
+      broadcast(note.getId(), new Message(Message.OP.PARAGRAPH).put("paragraph", p));
+    }
+  }
+
+  public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
+                                  Paragraph defaultParagraph) {
+    if (null != userParagraphMap) {
+      for (String user : userParagraphMap.keySet()) {
+        multicastToUser(user,
+            new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
+      }
+    }
+  }
+
+  private void broadcastNewParagraph(Note note, Paragraph para) {
+    LOGGER.info("Broadcasting paragraph on run call instead of note.");
+    int paraIndex = note.getParagraphs().indexOf(para);
+    broadcast(note.getId(),
+        new Message(Message.OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
+  }
+
+  //  public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
+  //    if (subject == null) {
+  //      subject = new AuthenticationInfo(StringUtils.EMPTY);
+  //    }
+  //    //send first to requesting user
+  //    List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
+  //    multicastToUser(subject.getUser(), new Message(Message.OP.NOTES_INFO)
+  // .put("notes", notesInfo));
+  //    //to others afterwards
+  //    broadcastNoteListExcept(notesInfo, subject);
+  //  }
+
+
+  private void broadcastNoteForms(Note note) {
+    GUI formsSettings = new GUI();
+    formsSettings.setForms(note.getNoteForms());
+    formsSettings.setParams(note.getNoteParams());
+    broadcast(note.getId(), new Message(Message.OP.SAVE_NOTE_FORMS)
+        .put("formsData", formsSettings));
+  }
+
+  public void switchConnectionToWatcher(NotebookSocket conn) {
+    if (!isSessionAllowedToSwitchToWatcher(conn)) {
+      LOGGER.error("Cannot switch this client to watcher, invalid security key");
+      return;
+    }
+    LOGGER.info("Going to add {} to watcher socket", conn);
+    // add the connection to the watcher.
+    if (watcherSockets.contains(conn)) {
+      LOGGER.info("connection alrerady present in the watcher");
+      return;
+    }
+    watcherSockets.add(conn);
+
+    // remove this connection from regular zeppelin ws usage.
+    removeConnection(conn);
+    removeConnectionFromAllNote(conn);
+    removeUserConnection(conn.getUser(), conn);
+  }
+
+  private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
+    String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
+    return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey
+        .equals(WatcherSecurityKey.getKey()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 16719f3..a3fce8f 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -17,8 +17,6 @@
 package org.apache.zeppelin.socket;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -52,7 +50,6 @@ import org.apache.zeppelin.notebook.ParagraphWithRuntimeInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
-import org.apache.zeppelin.notebook.socket.WatcherMessage;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
@@ -64,7 +61,6 @@ import org.apache.zeppelin.service.SimpleServiceCallback;
 import org.apache.zeppelin.ticket.TicketContainer;
 import org.apache.zeppelin.types.InterpreterSettingsList;
 import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.util.WatcherSecurityKey;
 import org.apache.zeppelin.utils.InterpreterBindingUtils;
 import org.apache.zeppelin.utils.SecurityUtils;
 import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch;
@@ -86,17 +82,13 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.regex.Matcher;
@@ -130,7 +122,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
 
-  private HashSet<String> collaborativeModeList = new HashSet<>();
+  //  private HashSet<String> collaborativeModeList = new HashSet<>();
   private Boolean collaborativeModeEnable = ZeppelinConfiguration
       .create()
       .isZeppelinNotebookCollaborativeModeEnable();
@@ -141,23 +133,17 @@ public class NotebookServer extends WebSocketServlet
       .setPrettyPrinting()
       .registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
 
-  final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
-  final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
-  final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
-
+  private ConnectionManager connectionManager;
   private NotebookService notebookService;
   private ConfigurationService configurationService;
   private JobManagerService jobManagerService;
 
   private ExecutorService executorService = Executors.newFixedThreadPool(10);
 
-  /**
-   * This is a special endpoint in the notebook websoket, Every connection in this Queue
-   * will be able to watch every websocket event, it doesnt need to be listed into the map of
-   * noteSocketMap. This can be used to get information about websocket traffic and watch what
-   * is going on.
-   */
-  final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+
+  public NotebookServer() {
+    this.connectionManager = new ConnectionManager();
+  }
 
   private Notebook notebook() {
     return ZeppelinServer.notebook;
@@ -198,14 +184,10 @@ public class NotebookServer extends WebSocketServlet
     return false;
   }
 
-  public NotebookSocket doWebSocketConnect(HttpServletRequest req, String protocol) {
-    return new NotebookSocket(req, protocol, this);
-  }
-
   @Override
   public void onOpen(NotebookSocket conn) {
     LOG.info("New connection from {}", conn);
-    connectedSockets.add(conn);
+    connectionManager.addConnection(conn);
   }
 
   @Override
@@ -258,7 +240,7 @@ public class NotebookServer extends WebSocketServlet
         }
       }
       if (StringUtils.isEmpty(conn.getUser())) {
-        addUserConnection(messagereceived.principal, conn);
+        connectionManager.addUserConnection(messagereceived.principal, conn);
       }
       AuthenticationInfo subject =
           new AuthenticationInfo(messagereceived.principal, messagereceived.roles,
@@ -404,7 +386,7 @@ public class NotebookServer extends WebSocketServlet
           getInterpreterSettings(conn);
           break;
         case WATCHER:
-          switchConnectionToWatcher(conn, messagereceived);
+          connectionManager.switchConnectionToWatcher(conn);
           break;
         case SAVE_NOTE_FORMS:
           saveNoteForms(conn, messagereceived);
@@ -426,30 +408,13 @@ public class NotebookServer extends WebSocketServlet
   @Override
   public void onClose(NotebookSocket conn, int code, String reason) {
     LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
-    removeConnectionFromAllNote(conn);
-    connectedSockets.remove(conn);
-    removeUserConnection(conn.getUser(), conn);
-  }
-
-  private void removeUserConnection(String user, NotebookSocket conn) {
-    LOG.debug("Remove user connection {} for user: {}", conn, user);
-    if (userConnectedSockets.containsKey(user)) {
-      userConnectedSockets.get(user).remove(conn);
-    } else {
-      LOG.warn("Closing connection that is absent in user connections");
-    }
+    connectionManager.removeConnection(conn);
+    connectionManager.removeConnectionFromAllNote(conn);
+    connectionManager.removeUserConnection(conn.getUser(), conn);
   }
 
-  private void addUserConnection(String user, NotebookSocket conn) {
-    LOG.debug("Add user connection {} for user: {}", conn, user);
-    conn.setUser(user);
-    if (userConnectedSockets.containsKey(user)) {
-      userConnectedSockets.get(user).add(conn);
-    } else {
-      Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
-      socketQueue.add(conn);
-      userConnectedSockets.put(user, socketQueue);
-    }
+  public ConnectionManager getConnectionManager() {
+    return connectionManager;
   }
 
   protected Message deserializeMessage(String msg) {
@@ -460,165 +425,13 @@ public class NotebookServer extends WebSocketServlet
     return gson.toJson(m);
   }
 
-  private void addConnectionToNote(String noteId, NotebookSocket socket) {
-    LOG.debug("Add connection {} to note: {}", socket, noteId);
-    synchronized (noteSocketMap) {
-      removeConnectionFromAllNote(socket); // make sure a socket relates only a
-      // single note.
-      List<NotebookSocket> socketList = noteSocketMap.get(noteId);
-      if (socketList == null) {
-        socketList = new LinkedList<>();
-        noteSocketMap.put(noteId, socketList);
-      }
-      if (!socketList.contains(socket)) {
-        socketList.add(socket);
-      }
-      checkCollaborativeStatus(noteId, socketList);
-    }
-  }
-
-  private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
-    LOG.debug("Remove connection {} from note: {}", socket, noteId);
-    synchronized (noteSocketMap) {
-      List<NotebookSocket> socketList = noteSocketMap.get(noteId);
-      if (socketList != null) {
-        socketList.remove(socket);
-      }
-      checkCollaborativeStatus(noteId, socketList);
-    }
-  }
-
-  private void removeConn(String noteId) {
-    synchronized (noteSocketMap) {
-      List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
-    }
-  }
-
-  private void removeConnectionFromAllNote(NotebookSocket socket) {
-    synchronized (noteSocketMap) {
-      Set<String> keys = noteSocketMap.keySet();
-      for (String noteId : keys) {
-        removeConnectionFromNote(noteId, socket);
-      }
-    }
-  }
-
-  private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) {
-    if (!collaborativeModeEnable) {
-      return;
-    }
-    boolean collaborativeStatusNew = socketList.size() > 1;
-    if (collaborativeStatusNew) {
-      collaborativeModeList.add(noteId);
-    } else {
-      collaborativeModeList.remove(noteId);
-    }
-
-    Message message = new Message(OP.COLLABORATIVE_MODE_STATUS);
-    message.put("status", collaborativeStatusNew);
-    if (collaborativeStatusNew) {
-      HashSet<String> userList = new HashSet<>();
-      for (NotebookSocket noteSocket : socketList) {
-        userList.add(noteSocket.getUser());
-      }
-      message.put("users", userList);
-    }
-    broadcast(noteId, message);
-  }
-
-  private String getOpenNoteId(NotebookSocket socket) {
-    String id = null;
-    synchronized (noteSocketMap) {
-      Set<String> keys = noteSocketMap.keySet();
-      for (String noteId : keys) {
-        List<NotebookSocket> sockets = noteSocketMap.get(noteId);
-        if (sockets.contains(socket)) {
-          id = noteId;
-        }
-      }
-    }
-
-    return id;
-  }
-
   public void broadcast(Message m) {
-    synchronized (connectedSockets) {
-      for (NotebookSocket ns : connectedSockets) {
-        try {
-          ns.send(serializeMessage(m));
-        } catch (IOException e) {
-          LOG.error("Send error: " + m, e);
-        }
-      }
-    }
-  }
-
-  private void broadcast(String noteId, Message m) {
-    List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
-    synchronized (noteSocketMap) {
-      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
-      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
-      if (socketLists == null || socketLists.size() == 0) {
-        return;
-      }
-      socketsToBroadcast = new ArrayList<>(socketLists);
-    }
-    LOG.debug("SEND >> " + m);
-    for (NotebookSocket conn : socketsToBroadcast) {
-      try {
-        conn.send(serializeMessage(m));
-      } catch (IOException e) {
-        LOG.error("socket error", e);
-      }
-    }
-  }
-
-  private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
-    List<NotebookSocket> socketsToBroadcast = Collections.emptyList();
-    synchronized (noteSocketMap) {
-      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
-      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
-      if (socketLists == null || socketLists.size() == 0) {
-        return;
-      }
-      socketsToBroadcast = new ArrayList<>(socketLists);
-    }
-
-    LOG.debug("SEND >> " + m);
-    for (NotebookSocket conn : socketsToBroadcast) {
-      if (exclude.equals(conn)) {
-        continue;
-      }
-      try {
-        conn.send(serializeMessage(m));
-      } catch (IOException e) {
-        LOG.error("socket error", e);
-      }
-    }
-  }
-
-  private void multicastToUser(String user, Message m) {
-    if (!userConnectedSockets.containsKey(user)) {
-      LOG.warn("Multicasting to user {} that is not in connections map", user);
-      return;
-    }
-
-    for (NotebookSocket conn : userConnectedSockets.get(user)) {
-      unicast(m, conn);
-    }
-  }
-
-  private void unicast(Message m, NotebookSocket conn) {
-    try {
-      conn.send(serializeMessage(m));
-    } catch (IOException e) {
-      LOG.error("socket error", e);
-    }
-    broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
+    connectionManager.broadcast(m);
   }
 
   public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
-    addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+
+    connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
     getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage),
         new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) {
           @Override
@@ -648,7 +461,7 @@ public class NotebookServer extends WebSocketServlet
             Map<String, Object> response = new HashMap<>();
             response.put("lastResponseUnixTime", System.currentTimeMillis());
             response.put("jobs", notesJobInfo);
-            broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+            connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
                 new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
           }
 
@@ -660,7 +473,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void unsubscribeNoteJobInfo(NotebookSocket conn) {
-    removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+    connectionManager.removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
   }
 
   public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -707,23 +520,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void broadcastNote(Note note) {
-    broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
-  }
-
-  public void unicastParagraph(Note note, Paragraph p, String user) {
-    if (!note.isPersonalizedMode() || p == null || user == null) {
-      return;
-    }
-
-    if (!userConnectedSockets.containsKey(user)) {
-      LOG.warn("Failed to send unicast. user {} that is not in connections map", user);
-      return;
-    }
-
-    for (NotebookSocket conn : userConnectedSockets.get(user)) {
-      Message m = new Message(OP.PARAGRAPH).put("paragraph", p);
-      unicast(m, conn);
-    }
+    connectionManager.broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
   }
 
   public void broadcastParagraph(Note note, Paragraph p) {
@@ -732,8 +529,8 @@ public class NotebookServer extends WebSocketServlet
     if (note.isPersonalizedMode()) {
       broadcastParagraphs(p.getUserParagraphMap(), p);
     } else {
-      broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph",
-          new ParagraphWithRuntimeInfo(p)));
+      connectionManager.broadcast(note.getId(),
+          new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p)));
     }
   }
 
@@ -741,7 +538,7 @@ public class NotebookServer extends WebSocketServlet
                                   Paragraph defaultParagraph) {
     if (null != userParagraphMap) {
       for (String user : userParagraphMap.keySet()) {
-        multicastToUser(user,
+        connectionManager.multicastToUser(user,
             new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
       }
     }
@@ -750,7 +547,7 @@ public class NotebookServer extends WebSocketServlet
   private void broadcastNewParagraph(Note note, Paragraph para) {
     LOG.info("Broadcasting paragraph on run call instead of note.");
     int paraIndex = note.getParagraphs().indexOf(para);
-    broadcast(note.getId(),
+    connectionManager.broadcast(note.getId(),
         new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
   }
 
@@ -760,9 +557,10 @@ public class NotebookServer extends WebSocketServlet
     }
     //send first to requesting user
     List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
-    multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
+    connectionManager.multicastToUser(subject.getUser(),
+        new Message(OP.NOTES_INFO).put("notes", notesInfo));
     //to others afterwards
-    broadcastNoteListExcept(notesInfo, subject);
+    connectionManager.broadcastNoteListExcept(notesInfo, subject);
   }
 
   public void listNotes(NotebookSocket conn, Message message) throws IOException {
@@ -772,7 +570,7 @@ public class NotebookServer extends WebSocketServlet
           public void onSuccess(List<Map<String, String>> notesInfo,
                                 ServiceContext context) throws IOException {
             super.onSuccess(notesInfo, context);
-            unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+            connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
           }
         });
   }
@@ -785,30 +583,14 @@ public class NotebookServer extends WebSocketServlet
           public void onSuccess(List<Map<String, String>> notesInfo,
                                 ServiceContext context) throws IOException {
             super.onSuccess(notesInfo, context);
-            multicastToUser(context.getAutheInfo().getUser(),
+            connectionManager.multicastToUser(context.getAutheInfo().getUser(),
                 new Message(OP.NOTES_INFO).put("notes", notesInfo));
             //to others afterwards
-            broadcastNoteListExcept(notesInfo, context.getAutheInfo());
+            connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo());
           }
         });
   }
 
-  private void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
-                                       AuthenticationInfo subject) {
-    Set<String> userAndRoles;
-    NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
-    for (String user : userConnectedSockets.keySet()) {
-      if (subject.getUser().equals(user)) {
-        continue;
-      }
-      //reloaded already above; parameter - false
-      userAndRoles = authInfo.getRoles(user);
-      userAndRoles.add(user);
-      notesInfo = generateNotesInfo(false, new AuthenticationInfo(user), userAndRoles);
-      multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
-    }
-  }
-
   void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles,
                        Set<String> allowed) throws IOException {
     LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed);
@@ -821,23 +603,6 @@ public class NotebookServer extends WebSocketServlet
 
 
   /**
-   * @return false if user doesn't have runner permission for this paragraph
-   */
-  private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook,
-                                               String noteId, HashSet<String> userAndRoles,
-                                               String principal, String op)
-      throws IOException {
-    NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
-    if (!notebookAuthorization.isRunner(noteId, userAndRoles)) {
-      permissionError(conn, op, principal, userAndRoles,
-          notebookAuthorization.getOwners(noteId));
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
    * @return false if user doesn't have writer permission for this paragraph
    */
   private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook,
@@ -880,7 +645,7 @@ public class NotebookServer extends WebSocketServlet
         new WebSocketServiceCallback<Note>(conn) {
           @Override
           public void onSuccess(Note note, ServiceContext context) throws IOException {
-            addConnectionToNote(note.getId(), conn);
+            connectionManager.addNoteConnection(note.getId(), conn);
             conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
             sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
           }
@@ -896,11 +661,11 @@ public class NotebookServer extends WebSocketServlet
           public void onSuccess(Note note, ServiceContext context) throws IOException {
             super.onSuccess(note, context);
             if (note != null) {
-              addConnectionToNote(note.getId(), conn);
+              connectionManager.addNoteConnection(note.getId(), conn);
               conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
               sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
             } else {
-              removeConnectionFromAllNote(conn);
+              connectionManager.removeConnectionFromAllNote(conn);
               conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
             }
           }
@@ -923,7 +688,7 @@ public class NotebookServer extends WebSocketServlet
         new WebSocketServiceCallback<Note>(conn) {
           @Override
           public void onSuccess(Note note, ServiceContext context) throws IOException {
-            broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+            connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
                 .put("config", config)
                 .put("info", note.getInfo()));
             broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
@@ -1021,7 +786,7 @@ public class NotebookServer extends WebSocketServlet
           @Override
           public void onSuccess(Note note, ServiceContext context) throws IOException {
             super.onSuccess(note, context);
-            addConnectionToNote(note.getId(), conn);
+            connectionManager.addNoteConnection(note.getId(), conn);
             conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
             broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
           }
@@ -1043,7 +808,7 @@ public class NotebookServer extends WebSocketServlet
           @Override
           public void onSuccess(String message, ServiceContext context) throws IOException {
             super.onSuccess(message, context);
-            removeConn(noteId);
+            connectionManager.removeNoteConnection(noteId);
             broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
           }
         });
@@ -1069,7 +834,7 @@ public class NotebookServer extends WebSocketServlet
     AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
     for (Note note : notes) {
       notebook.removeNote(note.getId(), subject);
-      removeConn(note.getId());
+      connectionManager.removeNoteConnection(note.getId());
     }
     broadcastNoteList(subject, userAndRoles);
   }
@@ -1186,7 +951,7 @@ public class NotebookServer extends WebSocketServlet
   private void updateParagraph(NotebookSocket conn,
                                Message fromMessage) throws IOException {
     String paragraphId = (String) fromMessage.get("id");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     if (noteId == null) {
       noteId = (String) fromMessage.get("noteId");
     }
@@ -1222,7 +987,7 @@ public class NotebookServer extends WebSocketServlet
       return;
     }
 
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     if (noteId == null) {
       noteId = fromMessage.getType("noteId", LOG);
       if (noteId == null) {
@@ -1265,19 +1030,19 @@ public class NotebookServer extends WebSocketServlet
     p.setText(paragraphText);
     Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText)
         .put("paragraphId", p.getId());
-    broadcastExcept(note.getId(), message, conn);
+    connectionManager.broadcastExcept(note.getId(), message, conn);
   }
 
   private void cloneNote(NotebookSocket conn,
                          Message fromMessage) throws IOException {
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     String name = (String) fromMessage.get("name");
     getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
         new WebSocketServiceCallback<Note>(conn) {
           @Override
           public void onSuccess(Note newNote, ServiceContext context) throws IOException {
             super.onSuccess(newNote, context);
-            addConnectionToNote(newNote.getId(), conn);
+            connectionManager.addNoteConnection(newNote.getId(), conn);
             conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
             broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
           }
@@ -1321,13 +1086,13 @@ public class NotebookServer extends WebSocketServlet
   private void removeParagraph(NotebookSocket conn,
                                Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     getNotebookService().removeParagraph(noteId, paragraphId,
         getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){
           @Override
           public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
             super.onSuccess(p, context);
-            broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+            connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
                 put("id", p.getId()));
           }
         });
@@ -1336,14 +1101,14 @@ public class NotebookServer extends WebSocketServlet
   private void clearParagraphOutput(NotebookSocket conn,
                                     Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
         new WebSocketServiceCallback<Paragraph>(conn) {
           @Override
           public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
             super.onSuccess(p, context);
             if (p.getNote().isPersonalizedMode()) {
-              unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+              connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
             } else {
               broadcastParagraph(p.getNote(), p);
             }
@@ -1353,7 +1118,7 @@ public class NotebookServer extends WebSocketServlet
 
   private void completion(NotebookSocket conn,
                           Message fromMessage) throws IOException {
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     String paragraphId = (String) fromMessage.get("id");
     String buffer = (String) fromMessage.get("buf");
     int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
@@ -1452,7 +1217,7 @@ public class NotebookServer extends WebSocketServlet
               .getId())) {
             AngularObjectRegistry angularObjectRegistry =
                 setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
-            this.broadcastExcept(n.getId(),
+            connectionManager.broadcastExcept(n.getId(),
                 new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
                     .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
                     .put("paragraphId", ao.getParagraphId()), conn);
@@ -1460,7 +1225,7 @@ public class NotebookServer extends WebSocketServlet
         }
       }
     } else { // broadcast to all web session for the note
-      this.broadcastExcept(note.getId(),
+      connectionManager.broadcastExcept(note.getId(),
           new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
               .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId())
               .put("paragraphId", ao.getParagraphId()), conn);
@@ -1551,7 +1316,8 @@ public class NotebookServer extends WebSocketServlet
     final AngularObject ao =
         remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
 
-    this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
+    connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE)
+        .put("angularObject", ao)
         .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
         .put("paragraphId", paragraphId), conn);
   }
@@ -1562,7 +1328,8 @@ public class NotebookServer extends WebSocketServlet
                                                NotebookSocket conn) {
     final AngularObject ao =
         remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
-    this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao)
+    connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE)
+        .put("angularObject", ao)
         .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
         .put("paragraphId", paragraphId), conn);
   }
@@ -1578,7 +1345,7 @@ public class NotebookServer extends WebSocketServlet
       angularObject.set(varValue, true);
     }
 
-    this.broadcastExcept(noteId,
+    connectionManager.broadcastExcept(noteId,
         new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject)
             .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
             .put("paragraphId", paragraphId), conn);
@@ -1589,7 +1356,7 @@ public class NotebookServer extends WebSocketServlet
                                                 String interpreterGroupId, NotebookSocket conn) {
     final AngularObject removed = registry.remove(varName, noteId, paragraphId);
     if (removed != null) {
-      this.broadcastExcept(noteId,
+      connectionManager.broadcastExcept(noteId,
           new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed)
               .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
               .put("paragraphId", paragraphId), conn);
@@ -1600,14 +1367,14 @@ public class NotebookServer extends WebSocketServlet
                              Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
         getServiceContext(fromMessage),
         new WebSocketServiceCallback<Paragraph>(conn) {
           @Override
           public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
             super.onSuccess(result, context);
-            broadcast(result.getNote().getId(),
+            connectionManager.broadcast(result.getNote().getId(),
                 new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
           }
         });
@@ -1616,7 +1383,7 @@ public class NotebookServer extends WebSocketServlet
   private String insertParagraph(NotebookSocket conn,
                                  Message fromMessage) throws IOException {
     final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     Map<String, Object> config;
     if (fromMessage.get("config") != null) {
       config = (Map<String, Object>) fromMessage.get("config");
@@ -1651,7 +1418,7 @@ public class NotebookServer extends WebSocketServlet
 
   private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
         new WebSocketServiceCallback<>(conn));
   }
@@ -1675,7 +1442,7 @@ public class NotebookServer extends WebSocketServlet
       return;
     }
 
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
 
     if (!hasParagraphWriterPermission(conn, notebook, noteId,
         userAndRoles, fromMessage.principal, "write")) {
@@ -1718,14 +1485,14 @@ public class NotebookServer extends WebSocketServlet
     }
 
     // broadcast to other clients only
-    broadcastExcept(note.getId(),
+    connectionManager.broadcastExcept(note.getId(),
         new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
   }
 
   private void runParagraph(NotebookSocket conn,
                             Message fromMessage) throws IOException {
     String paragraphId = (String) fromMessage.get("id");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
     String text = (String) fromMessage.get("paragraph");
     String title = (String) fromMessage.get("title");
     Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
@@ -1739,7 +1506,7 @@ public class NotebookServer extends WebSocketServlet
             if (p.getNote().isPersonalizedMode()) {
               Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
                   context.getAutheInfo().getUser());
-              unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+              connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
             }
 
             // if it's the last paragraph and not empty, let's add a new one
@@ -1922,7 +1689,7 @@ public class NotebookServer extends WebSocketServlet
   public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
     Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId)
         .put("paragraphId", paragraphId).put("index", index).put("data", output);
-    broadcast(noteId, msg);
+    connectionManager.broadcast(noteId, msg);
   }
 
   /**
@@ -1939,10 +1706,10 @@ public class NotebookServer extends WebSocketServlet
     if (note.isPersonalizedMode()) {
       String user = note.getParagraph(paragraphId).getUser();
       if (null != user) {
-        multicastToUser(user, msg);
+        connectionManager.multicastToUser(user, msg);
       }
     } else {
-      broadcast(noteId, msg);
+      connectionManager.broadcast(noteId, msg);
     }
   }
 
@@ -1967,7 +1734,7 @@ public class NotebookServer extends WebSocketServlet
     Message msg =
         new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("appId", appId).put("data", output);
-    broadcast(noteId, msg);
+    connectionManager.broadcast(noteId, msg);
   }
 
   /**
@@ -1979,14 +1746,14 @@ public class NotebookServer extends WebSocketServlet
     Message msg =
         new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("type", type).put("appId", appId).put("data", output);
-    broadcast(noteId, msg);
+    connectionManager.broadcast(noteId, msg);
   }
 
   @Override
   public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
     Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId)
         .put("appId", appId).put("pkg", pkg);
-    broadcast(noteId, msg);
+    connectionManager.broadcast(noteId, msg);
   }
 
   @Override
@@ -1994,7 +1761,7 @@ public class NotebookServer extends WebSocketServlet
     Message msg =
         new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("appId", appId).put("status", status);
-    broadcast(noteId, msg);
+    connectionManager.broadcast(noteId, msg);
   }
 
 
@@ -2086,6 +1853,7 @@ public class NotebookServer extends WebSocketServlet
       } catch (IOException e) {
         LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
       }
+
     }
 
     @Override
@@ -2127,7 +1895,7 @@ public class NotebookServer extends WebSocketServlet
         Map<String, Object> response = new HashMap<>();
         response.put("lastResponseUnixTime", System.currentTimeMillis());
         response.put("jobs", notesJobInfo);
-        broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+        connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
             new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
       }
     }
@@ -2135,7 +1903,7 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onProgressUpdate(Paragraph p, int progress) {
-    broadcast(p.getNote().getId(),
+    connectionManager.broadcast(p.getNote().getId(),
         new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress));
   }
 
@@ -2184,7 +1952,7 @@ public class NotebookServer extends WebSocketServlet
     Message msg =
         new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", paragraph.getNote().getId())
             .put("paragraphId", paragraph.getId()).put("data", output);
-    broadcast(paragraph.getNote().getId(), msg);
+    connectionManager.broadcast(paragraph.getNote().getId(), msg);
   }
 
   /**
@@ -2195,7 +1963,7 @@ public class NotebookServer extends WebSocketServlet
     Message msg =
         new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", paragraph.getNote().getId())
             .put("paragraphId", paragraph.getId()).put("data", result.getData());
-    broadcast(paragraph.getNote().getId(), msg);
+    connectionManager.broadcast(paragraph.getNote().getId(), msg);
   }
 
   @Override
@@ -2257,7 +2025,8 @@ public class NotebookServer extends WebSocketServlet
         continue;
       }
 
-      broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
+      connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE)
+          .put("angularObject", object)
           .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId())
           .put("paragraphId", object.getParagraphId()));
     }
@@ -2276,7 +2045,7 @@ public class NotebookServer extends WebSocketServlet
           notebook.getInterpreterSettingManager().getSettingIds();
       for (String id : settingIds) {
         if (interpreterGroupId.contains(id)) {
-          broadcast(note.getId(),
+          connectionManager.broadcast(note.getId(),
               new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId)
                   .put("paragraphId", paragraphId));
           break;
@@ -2288,7 +2057,7 @@ public class NotebookServer extends WebSocketServlet
   private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException {
     String paragraphId = (String) fromMessage.get("paragraphId");
     String replName = (String) fromMessage.get("magic");
-    String noteId = getOpenNoteId(conn);
+    String noteId = connectionManager.getAssociatedNoteId(conn);
 
     getNotebookService().getEditorSetting(noteId, replName,
         getServiceContext(fromMessage),
@@ -2317,68 +2086,6 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings)));
   }
 
-  private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived) {
-    if (!isSessionAllowedToSwitchToWatcher(conn)) {
-      LOG.error("Cannot switch this client to watcher, invalid security key");
-      return;
-    }
-    LOG.info("Going to add {} to watcher socket", conn);
-    // add the connection to the watcher.
-    if (watcherSockets.contains(conn)) {
-      LOG.info("connection alrerady present in the watcher");
-      return;
-    }
-    watcherSockets.add(conn);
-
-    // remove this connection from regular zeppelin ws usage.
-    removeConnectionFromAllNote(conn);
-    connectedSockets.remove(conn);
-    removeUserConnection(conn.getUser(), conn);
-  }
-
-  private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
-    String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
-    return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey
-        .equals(WatcherSecurityKey.getKey()));
-  }
-
-  /**
-   * Send websocket message to all connections regardless of notebook id.
-   */
-  private void broadcastToAllConnections(String serialized) {
-    broadcastToAllConnectionsExcept(null, serialized);
-  }
-
-  private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
-    synchronized (connectedSockets) {
-      for (NotebookSocket conn : connectedSockets) {
-        if (exclude != null && exclude.equals(conn)) {
-          continue;
-        }
-
-        try {
-          conn.send(serialized);
-        } catch (IOException e) {
-          LOG.error("Cannot broadcast message to watcher", e);
-        }
-      }
-    }
-  }
-
-  private void broadcastToWatchers(String noteId, String subject, Message message) {
-    synchronized (watcherSockets) {
-      for (NotebookSocket watcher : watcherSockets) {
-        try {
-          watcher.send(
-              WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message))
-                  .build().toJson());
-        } catch (IOException e) {
-          LOG.error("Cannot broadcast message to watcher", e);
-        }
-      }
-    }
-  }
-
   @Override
   public void onParaInfosReceived(String noteId, String paragraphId,
                                   String interpreterSettingId, Map<String, String> metaInfos) {
@@ -2397,7 +2104,7 @@ public class NotebookServer extends WebSocketServlet
 
         paragraph
             .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId());
-        broadcast(
+        connectionManager.broadcast(
             note.getId(),
             new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos",
                 paragraph.getRuntimeInfos()));
@@ -2409,7 +2116,8 @@ public class NotebookServer extends WebSocketServlet
     GUI formsSettings = new GUI();
     formsSettings.setForms(note.getNoteForms());
     formsSettings.setParams(note.getNoteParams());
-    broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
+    connectionManager.broadcast(note.getId(),
+        new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
   }
 
   private void saveNoteForms(NotebookSocket conn,
@@ -2442,18 +2150,14 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public Set<String> getConnectedUsers() {
-    Set<String> connectionList = Sets.newHashSet();
-    for (NotebookSocket notebookSocket : connectedSockets) {
-      connectionList.add(notebookSocket.getUser());
-    }
-    return connectionList;
+    return connectionManager.getConnectedUsers();
   }
 
   @Override
   public void sendMessage(String message) {
     Message m = new Message(OP.NOTICE);
     m.data.put("notice", message);
-    broadcast(m);
+    connectionManager.broadcast(m);
   }
 
   private ServiceContext getServiceContext(Message message) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e10332c9/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index abce8b6..03e7ee6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -300,7 +300,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
             .put("noteId", "noteId")
             .put("paragraphId", "paragraphId"));
 
-    server.noteSocketMap.put("noteId", asList(conn, otherConn));
+    server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
 
     // When
     server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -349,7 +349,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
             .put("noteId", "noteId")
             .put("paragraphId", "paragraphId"));
 
-    server.noteSocketMap.put("noteId", asList(conn, otherConn));
+    server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
 
     // When
     server.angularObjectClientBind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -393,7 +393,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
             .put("noteId", "noteId")
             .put("paragraphId", "paragraphId"));
 
-    server.noteSocketMap.put("noteId", asList(conn, otherConn));
+    server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
 
     // When
     server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived);
@@ -440,7 +440,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
             .put("interpreterGroupId", "mdGroup")
             .put("noteId", "noteId")
             .put("paragraphId", "paragraphId"));
-    server.noteSocketMap.put("noteId", asList(conn, otherConn));
+    server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
 
     // When
     server.angularObjectClientUnbind(conn, new HashSet<String>(), notebook, messageReceived);