You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/11/16 21:30:49 UTC

zeppelin git commit: [ZEPPELIN-1610] - Add notebook watcher

Repository: zeppelin
Updated Branches:
  refs/heads/master e0930570d -> 5e85e6ea6


[ZEPPELIN-1610] - Add notebook watcher

### What is this PR for?
Add a Simple way to switch a websocket connection to a new state; watcher.
A websocket watcher is a special connection that will watch most of the web socket even in Zeppelin, this cam be used to monitor zeppelin server activity.

### What type of PR is it?
[Feature]

### Todos
* [x] - Add watcher Queue
* [x] - Add endpoint to switch from regular client to watcher
* [x] - Add a way to generate a uniq key when zeppelin server restart
* [x] - Add example on how to use watcher.

### What is the Jira issue?
* [ZEPPELIN-1610](https://issues.apache.org/jira/browse/ZEPPELIN-1610)

### How should this be tested?
You will have to create your own websocket client and provide a valid http header (`X-Watcher-Key`) when you connect to zeppelin ws
 something like
```
private Session openWatcherSession() {
    ClientUpgradeRequest request = new ClientUpgradeRequest();
    request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
    WatcherWebsocket socket = WatcherWebsocket.createInstace();
    Future<Session> future = null;
    Session session = null;
    try {
      future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
      session = future.get();
    } catch (IOException | InterruptedException | ExecutionException e) {
      LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
      return session;
    }
    return session;
  }
```

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: Anthony Corbacho <co...@gmail.com>

Closes #1588 from anthonycorbacho/feat/updateWebsocketInZeppelinHubRepo and squashes the following commits:

26cf53f [Anthony Corbacho] Move broadcastToWatcher
8413e9b [Anthony Corbacho] Remove redundant broadcast
fb2c260 [Anthony Corbacho] Remove TODO in socket/Message
716c92c [Anthony Corbacho] Fix checkstyle
a10ba13 [Anthony Corbacho] Add remove connection from note back in test
89d70f2 [Anthony Corbacho] fix test
092791e [Anthony Corbacho] Light refactoring :: add missing header, add comment, refacto some methods
8f7e1b3 [Anthony Corbacho] Add X-Watcher-Key in request header for watcher client
e2d3053 [Anthony Corbacho] Add simple check for ws before switching ws client to watcher, client should provide a header X-Watcher-Key with a valid key (generated at runtime), if key invalid client wont be accepted
e25ea1e [Anthony Corbacho] Add simple Key generation for Watcher ws client
4affe25 [Anthony Corbacho] Handle remoing wssession from notebook map once the session is close :: avoiding socket connection to be ide
c32192a [Anthony Corbacho] rework watcher creation and ws session with notes
3bd3482 [Anthony Corbacho] Reorder import :: Google check style
bde5db4 [Anthony Corbacho] Update ping routine
ede1f18 [Anthony Corbacho] make private field public for accessibility
aa55a5a [Anthony Corbacho] Strting to rework ZeppelinClient
e5b3a1d [Anthony Corbacho] Add zeppelinhub notebook watcher
9d6c93f [Anthony Corbacho] Add new OP watcher
0d7f493 [Anthony Corbacho] Added new WS queue called watcher, watcher will be abler to listen to almost every note action performed in zeppelin notebook websocket server
45849ce [Anthony Corbacho] Add new message type :: Watcher message, this class will wrapp zeppelin ws message and add extra information such as noteId and user


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5e85e6ea
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5e85e6ea
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5e85e6ea

Branch: refs/heads/master
Commit: 5e85e6ea6ff004735fde9bef58085b83369f864b
Parents: e093057
Author: Anthony Corbacho <co...@gmail.com>
Authored: Mon Nov 14 14:56:38 2016 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Nov 16 13:30:42 2016 -0800

----------------------------------------------------------------------
 .../apache/zeppelin/socket/NotebookServer.java  |  85 ++++++++--
 .../java/org/apache/zeppelin/notebook/Note.java |  10 +-
 .../zeppelinhub/websocket/ZeppelinClient.java   | 156 +++++++++++--------
 .../websocket/listener/WatcherWebsocket.java    |  81 ++++++++++
 .../websocket/listener/ZeppelinWebsocket.java   |   5 +-
 .../websocket/scheduler/ZeppelinHeartbeat.java  |   4 +-
 .../zeppelin/notebook/socket/Message.java       |   5 +-
 .../notebook/socket/WatcherMessage.java         |  73 +++++++++
 .../zeppelin/util/WatcherSecurityKey.java       |  35 +++++
 .../websocket/ZeppelinClientTest.java           |  10 +-
 10 files changed, 378 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index bc0d7f5..c434ffe 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -48,16 +48,24 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.NotebookEventListener;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.ParagraphJobListener;
 import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.ticket.TicketContainer;
 import org.apache.zeppelin.types.InterpreterSettingsList;
 import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.util.WatcherSecurityKey;
 import org.apache.zeppelin.utils.InterpreterBindingUtils;
 import org.apache.zeppelin.utils.SecurityUtils;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
@@ -67,6 +75,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -97,6 +106,14 @@ public class NotebookServer extends WebSocketServlet implements
   final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
   final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
 
+  /**
+   * This is a special endpoint in the notebook websoket, Every connection in this Queue
+   * will be able to watch every websocket event, it doesnt need to be listed into the map of
+   * noteSocketMap. This can be used to get information about websocket traffic and watch what
+   * is going on.
+   */
+  final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+  
   private Notebook notebook() {
     return ZeppelinServer.notebook;
   }
@@ -275,6 +292,9 @@ public class NotebookServer extends WebSocketServlet implements
           case GET_INTERPRETER_SETTINGS:
             getInterpreterSettings(conn, subject);
             break;
+          case WATCHER:
+            switchConnectionToWatcher(conn, messagereceived);
+            break;
           default:
             break;
       }
@@ -389,6 +409,7 @@ public class NotebookServer extends WebSocketServlet implements
 
   private void broadcast(String noteId, Message m) {
     synchronized (noteSocketMap) {
+      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
       List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
       if (socketLists == null || socketLists.size() == 0) {
         return;
@@ -406,6 +427,7 @@ public class NotebookServer extends WebSocketServlet implements
 
   private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
     synchronized (noteSocketMap) {
+      broadcastToWatchers(noteId, StringUtils.EMPTY, m);
       List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
       if (socketLists == null || socketLists.size() == 0) {
         return;
@@ -431,11 +453,7 @@ public class NotebookServer extends WebSocketServlet implements
     }
 
     for (NotebookSocket conn: userConnectedSockets.get(user)) {
-      try {
-        conn.send(serializeMessage(m));
-      } catch (IOException e) {
-        LOG.error("socket error", e);
-      }
+      unicast(m, conn);
     }
   }
 
@@ -445,6 +463,7 @@ public class NotebookServer extends WebSocketServlet implements
     } catch (IOException e) {
       LOG.error("socket error", e);
     }
+    broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
   }
 
   public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -545,10 +564,8 @@ public class NotebookServer extends WebSocketServlet implements
     broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
   }
 
-  public void broadcastInterpreterBindings(String noteId,
-                                           List settingList) {
-    broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
-        .put("interpreterBindings", settingList));
+  public void broadcastInterpreterBindings(String noteId, List settingList) {
+    broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
   }
 
   public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@@ -1770,6 +1787,50 @@ public class NotebookServer extends WebSocketServlet implements
         .get(settingId);
     interpreterSetting.setInfos(metaInfos);
   }
-
+  
+  private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
+      throws IOException {
+    if (!isSessionAllowedToSwitchToWatcher(conn)) {
+      LOG.error("Cannot switch this client to watcher, invalid security key");
+      return;
+    }
+    LOG.info("Going to add {} to watcher socket", conn);
+    // add the connection to the watcher.
+    if (watcherSockets.contains(conn)) {
+      LOG.info("connection alrerady present in the watcher");
+      return;
+    }
+    watcherSockets.add(conn);
+    
+    // remove this connection from regular zeppelin ws usage.
+    removeConnectionFromAllNote(conn);
+    connectedSockets.remove(conn);
+    removeUserConnection(conn.getUser(), conn);
+  }
+  
+  private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
+    String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
+    return !(StringUtils.isBlank(watcherSecurityKey)
+        || !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
+  }
+  
+  private void broadcastToWatchers(String noteId, String subject, Message message) {
+    synchronized (watcherSockets) {
+      if (watcherSockets.isEmpty()) {
+        return;
+      }
+      for (NotebookSocket watcher : watcherSockets) {
+        try {
+          watcher.send(WatcherMessage
+                         .builder(noteId)
+                         .subject(subject)
+                         .message(serializeMessage(message))
+                         .build()
+                         .serialize());
+        } catch (IOException e) {
+          LOG.error("Cannot broadcast message to watcher", e);
+        }
+      }
+    }
+  }
 }
-

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 66362bd..aa08adf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -30,12 +30,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
 import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -57,6 +52,11 @@ import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
 
 /**
  * Binded interpreters for a note

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
index c40b504..e05a746 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -27,13 +28,14 @@ import java.util.concurrent.Future;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
-import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.util.WatcherSecurityKey;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@@ -54,11 +56,14 @@ public class ZeppelinClient {
   private final String zeppelinhubToken;
   private final WebSocketClient wsClient;
   private static Gson gson;
-  private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
+  // Keep track of current open connection per notebook.
+  private ConcurrentHashMap<String, Session> notesConnection;
+  // Listen to every note actions.
+  private static Session watcherSession;
   private static ZeppelinClient instance = null;
   private SchedulerService schedulerService;
   private Authentication authModule;
-  private static final int min = 60;
+  private static final int MIN = 60;
 
   public static ZeppelinClient initialize(String zeppelinUrl, String token, 
       ZeppelinConfiguration conf) {
@@ -77,7 +82,7 @@ public class ZeppelinClient {
     zeppelinhubToken = token;
     wsClient = createNewWebsocketClient();
     gson = new Gson();
-    zeppelinConnectionMap = new ConcurrentHashMap<>();
+    notesConnection = new ConcurrentHashMap<>();
     schedulerService = SchedulerService.getInstance();
     authModule = Authentication.initialize(token, conf);
     if (authModule != null) {
@@ -89,7 +94,7 @@ public class ZeppelinClient {
   private WebSocketClient createNewWebsocketClient() {
     SslContextFactory sslContextFactory = new SslContextFactory();
     WebSocketClient client = new WebSocketClient(sslContextFactory);
-    client.setMaxIdleTimeout(5 * min * 1000);
+    client.setMaxIdleTimeout(5 * MIN * 1000);
     client.setMaxTextMessageBufferSize(Client.getMaxNoteSize());
     client.getPolicy().setMaxTextMessageSize(Client.getMaxNoteSize());
     //TODO(khalid): other client settings
@@ -110,17 +115,26 @@ public class ZeppelinClient {
   }
 
   private void addRoutines() {
-    schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
+    schedulerService.add(ZeppelinHeartbeat.newInstance(this), 10, 1 * MIN);
+    new Timer().schedule(new java.util.TimerTask() {
+      @Override
+      public void run() {
+        watcherSession = openWatcherSession();
+      }
+    }, 5000);
   }
 
   public void stop() {
     try {
       if (wsClient != null) {
-        removeAllZeppelinConnections();
+        removeAllConnections();
         wsClient.stop();
       } else {
         LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
       }
+      if (watcherSession != null) {
+        watcherSession.close();
+      }
     } catch (Exception e) {
       LOG.error("Cannot stop Zeppelin websocket client", e);
     }
@@ -153,6 +167,22 @@ public class ZeppelinClient {
     }
     return msg;
   }
+  
+  private Session openWatcherSession() {
+    ClientUpgradeRequest request = new ClientUpgradeRequest();
+    request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
+    WatcherWebsocket socket = WatcherWebsocket.createInstace();
+    Future<Session> future = null;
+    Session session = null;
+    try {
+      future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
+      session = future.get();
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
+      return session;
+    }
+    return session;
+  }
 
   public void send(Message msg, String noteId) {
     Session noteSession = getZeppelinConnection(noteId);
@@ -162,27 +192,16 @@ public class ZeppelinClient {
     }
     noteSession.getRemote().sendStringByFuture(serialize(msg));
   }
-
-  private boolean isSessionOpen(Session session) {
-    return (session != null) && (session.isOpen());
-  }
   
-  /* per notebook based ws connection, returns null if can't connect */
   public Session getZeppelinConnection(String noteId) {
     if (StringUtils.isBlank(noteId)) {
-      LOG.warn("Cannot return websocket connection for blank noteId");
+      LOG.warn("Cannot get Websocket session with blanck noteId");
       return null;
     }
-
-    if (zeppelinConnectionMap.containsKey(noteId)) {
-      LOG.info("Connection for {} exists in map", noteId);
-      return getNoteSession(noteId);
-    }
-    //TODO(khalid): clean log later
-    LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
-    return openNoteSession(noteId);
+    return getNoteSession(noteId);
   }
-
+  
+/*
   private Message zeppelinGetNoteMsg(String noteId) {
     Message getNoteMsg = new Message(Message.OP.GET_NOTE);
     HashMap<String, Object> data = new HashMap<>();
@@ -190,12 +209,14 @@ public class ZeppelinClient {
     getNoteMsg.data = data;
     return getNoteMsg;
   }
-  
+  */
+
   private Session getNoteSession(String noteId) {
-    Session session = zeppelinConnectionMap.get(noteId);
-    if (session == null || !session.isOpen()) {
-      LOG.info("Not connection to {}", noteId);
-      zeppelinConnectionMap.remove(noteId);
+    LOG.info("Getting Note websocket connection for note {}", noteId);
+    Session session = notesConnection.get(noteId);
+    if (!isSessionOpen(session)) {
+      LOG.info("No open connection for note {}, opening one", noteId);
+      notesConnection.remove(noteId);
       session = openNoteSession(noteId);
     }
     return session;
@@ -214,17 +235,28 @@ public class ZeppelinClient {
       return session;
     }
 
-    if (zeppelinConnectionMap.containsKey(noteId)) {
+    if (notesConnection.containsKey(noteId)) {
       session.close();
-      session = zeppelinConnectionMap.get(noteId);
+      session = notesConnection.get(noteId);
     } else {
       String getNote = serialize(zeppelinGetNoteMsg(noteId));
-      // TODO(khalid): may need to check return whether successful
       session.getRemote().sendStringByFuture(getNote);
-      zeppelinConnectionMap.put(noteId, session);
+      notesConnection.put(noteId, session);
     }
     return session;
   }
+  
+  private boolean isSessionOpen(Session session) {
+    return (session != null) && (session.isOpen());
+  }
+
+  private Message zeppelinGetNoteMsg(String noteId) {
+    Message getNoteMsg = new Message(Message.OP.GET_NOTE);
+    HashMap<String, Object> data = new HashMap<String, Object>();
+    data.put("id", noteId);
+    getNoteMsg.data = data;
+    return getNoteMsg;
+  }
 
   public void handleMsgFromZeppelin(String message, String noteId) {
     Map<String, String> meta = new HashMap<>();
@@ -243,46 +275,48 @@ public class ZeppelinClient {
     client.relayToZeppelinHub(hubMsg.serialize());
   }
 
-  /**
-   * Close and remove ZeppelinConnection
-   */
-  public void removeZeppelinConnection(String noteId) {
-    if (zeppelinConnectionMap.containsKey(noteId)) {
-      Session conn = zeppelinConnectionMap.get(noteId);
-      if (conn.isOpen()) {
-        conn.close();
+  public void removeNoteConnection(String noteId) {
+    if (StringUtils.isBlank(noteId)) {
+      LOG.error("Cannot remove session for empty noteId");
+      return;
+    }
+    if (notesConnection.containsKey(noteId)) {
+      Session connection = notesConnection.get(noteId);
+      if (connection.isOpen()) {
+        connection.close();
       }
-      zeppelinConnectionMap.remove(noteId);
+      notesConnection.remove(noteId);
     }
-    // TODO(khalid): clean log later
-    LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
+    LOG.info("Removed note websocket connection for note {}", noteId);
   }
+  
+  private void removeAllConnections() {
+    if (watcherSession != null && watcherSession.isOpen()) {
+      watcherSession.close();
+    }
 
-  /**
-   * Close and remove all ZeppelinConnection
-   */
-  public void removeAllZeppelinConnections() {
-    for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
-      if (isSessionOpen(entry.getValue())) {
-        entry.getValue().close();
+    Session noteSession = null;
+    for (Map.Entry<String, Session> note: notesConnection.entrySet()) {
+      noteSession = note.getValue();
+      if (isSessionOpen(noteSession)) {
+        noteSession.close();
       }
-      zeppelinConnectionMap.remove(entry.getKey());
     }
-    LOG.info("Removed all Zeppelin ws connections");
+    notesConnection.clear();
   }
 
-  public void pingAllNotes() {
-    for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
-      if (isSessionOpen(entry.getValue())) {
-        send(new Message(OP.PING), entry.getKey());
-      } else {
-        // for cleanup
-        zeppelinConnectionMap.remove(entry.getKey());
-      }
+  public void ping() {
+    if (watcherSession == null) {
+      LOG.info("Cannot send PING event, no watcher found");
+      return;
     }
+    watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
   }
-
+  
+  /**
+   * Only used in test.
+   */
   public int countConnectedNotes() {
-    return zeppelinConnectionMap.size();
+    return notesConnection.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
new file mode 100644
index 0000000..5ccacb9
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Zeppelin Watcher that will forward user note to ZeppelinHub.
+ *
+ */
+public class WatcherWebsocket implements WebSocketListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
+  private static final Gson GSON = new Gson();
+  public Session connection;
+  
+  public static WatcherWebsocket createInstace() {
+    return new WatcherWebsocket();
+  }
+  
+  @Override
+  public void onWebSocketBinary(byte[] payload, int offset, int len) {
+  }
+
+  @Override
+  public void onWebSocketClose(int code, String reason) {
+    LOG.info("WatcherWebsocket connection closed with code: {}, message: {}", code, reason);
+  }
+
+  @Override
+  public void onWebSocketConnect(Session session) {
+    LOG.info("WatcherWebsocket connection opened");
+    this.connection = session;
+    session.getRemote().sendStringByFuture(GSON.toJson(new Message(OP.WATCHER)));
+  }
+
+  @Override
+  public void onWebSocketError(Throwable cause) {
+    LOG.warn("WatcherWebsocket socket connection error ", cause);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    WatcherMessage watcherMsg = GSON.fromJson(message, WatcherMessage.class);
+    if (StringUtils.isBlank(watcherMsg.noteId)) {
+      return;
+    }
+    try {
+      ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
+      if (zeppelinClient != null) {
+        zeppelinClient.handleMsgFromZeppelin(watcherMsg.message, watcherMsg.noteId);
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to send message to ZeppelinHub: ", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
index ac102b5..fa6ade8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
@@ -43,7 +43,7 @@ public class ZeppelinWebsocket implements WebSocketListener {
   @Override
   public void onWebSocketClose(int code, String message) {
     LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
-    // parentClient.removeConnMap(noteId);
+    ZeppelinClient.getInstance().removeNoteConnection(noteId);
   }
 
   @Override
@@ -54,7 +54,8 @@ public class ZeppelinWebsocket implements WebSocketListener {
 
   @Override
   public void onWebSocketError(Throwable e) {
-    LOG.warn("Zeppelin socket connection error: {}", e.toString());
+    LOG.warn("Zeppelin socket connection error ", e);
+    ZeppelinClient.getInstance().removeNoteConnection(noteId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
index 8a85c84..11cfa45 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
@@ -38,7 +38,7 @@ public class ZeppelinHeartbeat implements Runnable {
 
   @Override
   public void run() {
-    LOG.debug("Sending PING to all connected Zeppelin notes");
-    client.pingAllNotes();
+    LOG.debug("Sending PING to Zeppelin Websocket Server");
+    client.ping();
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index 9fe9636..b6a305c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -145,9 +145,12 @@ public class Message {
     INTERPRETER_BINDINGS,         // [s-c] interpreter bindings
     GET_INTERPRETER_SETTINGS,     // [c-s] get interpreter settings
     INTERPRETER_SETTINGS,         // [s-c] interpreter settings
-    ERROR_INFO                    // [s-c] error information to be sent
+    ERROR_INFO,                   // [s-c] error information to be sent
+    WATCHER,                      // [s-c] Change websocket to watcher mode.
   }
 
+  public static final Message EMPTY = new Message(null);
+  
   public OP op;
   public Map<String, Object> data = new HashMap<>();
   public String ticket = "anonymous";

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
new file mode 100644
index 0000000..0fb28cc
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.socket;
+
+import com.google.gson.Gson;
+
+/**
+ * Zeppelin websocket massage template class for watcher socket.
+ */
+public class WatcherMessage {
+
+  public String message;
+  public String noteId;
+  public String subject;
+  
+  private static final Gson gson = new Gson();
+  
+  public static Builder builder(String noteId) {
+    return new Builder(noteId);
+  }
+  
+  private WatcherMessage(Builder builder) {
+    this.noteId = builder.noteId;
+    this.message = builder.message;
+    this.subject = builder.subject;
+  }
+  
+  public String serialize() {
+    return gson.toJson(this);
+  }
+  
+  /**
+   * Simple builder.
+   */
+  public static class Builder {
+    private final String noteId;
+    private String subject;
+    private String message;
+    
+    public Builder(String noteId) {
+      this.noteId = noteId;
+    }
+    
+    public Builder subject(String subject) {
+      this.subject = subject;
+      return this;
+    }
+    
+    public Builder message(String message) {
+      this.message = message;
+      return this;
+    }
+
+    public WatcherMessage build() {
+      return new WatcherMessage(this);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
new file mode 100644
index 0000000..f0c3ad2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.util;
+
+import java.util.UUID;
+
+/**
+ * Simple implementation of a auto-generated key for websocket watcher.
+ * This is a LAZY implementation, we might want to update this later on :)
+ */
+public class WatcherSecurityKey {
+  public static final String HTTP_HEADER = "X-Watcher-Key";
+  private static final String KEY = UUID.randomUUID().toString();
+
+  protected WatcherSecurityKey() {}
+
+  public static String getKey() {
+    return KEY;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
index b2cc81c..746e775 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
@@ -1,6 +1,10 @@
 package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -74,7 +78,7 @@ public class ZeppelinClientTest {
     assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
 
     // Remove connection to note AAAA
-    client.removeZeppelinConnection("AAAA");
+    client.removeNoteConnection("AAAA");
     assertEquals(client.countConnectedNotes(), 1);
     assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
     assertEquals(client.countConnectedNotes(), 2);
@@ -117,7 +121,7 @@ public class ZeppelinClientTest {
     msg.data = Maps.newHashMap();
     msg.data.put("key", "value");
     client.send(msg, "DDDD");
-    client.removeZeppelinConnection("DDDD");
+    client.removeNoteConnection("DDDD");
     client.stop();
   }
 }