You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/10 06:54:38 UTC

[2/3] zeppelin git commit: ZEPPELIN-3681. Introduce NotebookService

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 78ff078..a376623 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -54,9 +54,13 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.service.NotebookService;
+import org.apache.zeppelin.service.ServiceContext;
+import org.apache.zeppelin.service.SimpleServiceCallback;
 import org.apache.zeppelin.ticket.TicketContainer;
 import org.apache.zeppelin.types.InterpreterSettingsList;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -75,6 +79,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.text.ParseException;
@@ -102,11 +107,11 @@ import java.util.regex.Pattern;
  */
 public class NotebookServer extends WebSocketServlet
     implements NotebookSocketListener,
-        JobListenerFactory,
-        AngularObjectRegistryListener,
-        RemoteInterpreterProcessListener,
-        ApplicationEventListener,
-        NotebookServerMBean {
+    JobListenerFactory,
+    AngularObjectRegistryListener,
+    RemoteInterpreterProcessListener,
+    ApplicationEventListener,
+    NotebookServerMBean {
 
   /**
    * Job manager service type.
@@ -127,8 +132,8 @@ public class NotebookServer extends WebSocketServlet
 
   private HashSet<String> collaborativeModeList = new HashSet<>();
   private Boolean collaborativeModeEnable = ZeppelinConfiguration
-          .create()
-          .isZeppelinNotebookCollaborativeModeEnable();
+      .create()
+      .isZeppelinNotebookCollaborativeModeEnable();
   private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
   private static Gson gson = new GsonBuilder()
       .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -140,6 +145,8 @@ public class NotebookServer extends WebSocketServlet
   final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
   final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
 
+  private NotebookService notebookService;
+
   private ExecutorService executorService = Executors.newFixedThreadPool(10);
 
   /**
@@ -154,6 +161,13 @@ public class NotebookServer extends WebSocketServlet
     return ZeppelinServer.notebook;
   }
 
+  private synchronized NotebookService getNotebookService() {
+    if (this.notebookService == null) {
+      this.notebookService = new NotebookService(notebook());
+    }
+    return this.notebookService;
+  }
+
   @Override
   public void configure(WebSocketServletFactory factory) {
     factory.setCreator(new NotebookWebSocketCreator(this));
@@ -174,8 +188,7 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onOpen(NotebookSocket conn) {
-    LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort());
+    LOG.info("New connection from {}", conn);
     connectedSockets.add(conn);
   }
 
@@ -184,12 +197,13 @@ public class NotebookServer extends WebSocketServlet
     Notebook notebook = notebook();
     try {
       Message messagereceived = deserializeMessage(msg);
-      LOG.debug("RECEIVE << " + messagereceived.op +
-          ", RECEIVE PRINCIPAL << " + messagereceived.principal +
-          ", RECEIVE TICKET << " + messagereceived.ticket +
-          ", RECEIVE ROLES << " + messagereceived.roles +
-          ", RECEIVE DATA << " + messagereceived.data);
-
+      if (messagereceived.op != OP.PING) {
+        LOG.debug("RECEIVE: " + messagereceived.op +
+            ", RECEIVE PRINCIPAL: " + messagereceived.principal +
+            ", RECEIVE TICKET: " + messagereceived.ticket +
+            ", RECEIVE ROLES: " + messagereceived.roles +
+            ", RECEIVE DATA: " + messagereceived.data);
+      }
       if (LOG.isTraceEnabled()) {
         LOG.trace("RECEIVE MSG = " + messagereceived);
       }
@@ -237,22 +251,22 @@ public class NotebookServer extends WebSocketServlet
       // Lets be elegant here
       switch (messagereceived.op) {
         case LIST_NOTES:
-          unicastNoteList(conn, subject, userAndRoles);
+          listNotes(conn, messagereceived);
           break;
         case RELOAD_NOTES_FROM_REPO:
           broadcastReloadedNoteList(subject, userAndRoles);
           break;
         case GET_HOME_NOTE:
-          sendHomeNote(conn, userAndRoles, notebook, messagereceived);
+          getHomeNote(conn, messagereceived);
           break;
         case GET_NOTE:
-          sendNote(conn, userAndRoles, notebook, messagereceived);
+          getNote(conn, messagereceived);
           break;
         case NEW_NOTE:
-          createNote(conn, userAndRoles, notebook, messagereceived);
+          createNote(conn, messagereceived);
           break;
         case DEL_NOTE:
-          removeNote(conn, userAndRoles, notebook, messagereceived);
+          deleteNote(conn, messagereceived);
           break;
         case REMOVE_FOLDER:
           removeFolder(conn, userAndRoles, notebook, messagereceived);
@@ -270,55 +284,55 @@ public class NotebookServer extends WebSocketServlet
           restoreFolder(conn, userAndRoles, notebook, messagereceived);
           break;
         case RESTORE_NOTE:
-          restoreNote(conn, userAndRoles, notebook, messagereceived);
+          restoreNote(conn, messagereceived);
           break;
         case RESTORE_ALL:
           restoreAll(conn, userAndRoles, notebook, messagereceived);
           break;
         case CLONE_NOTE:
-          cloneNote(conn, userAndRoles, notebook, messagereceived);
+          cloneNote(conn, messagereceived);
           break;
         case IMPORT_NOTE:
-          importNote(conn, userAndRoles, notebook, messagereceived);
+          importNote(conn, messagereceived);
           break;
         case COMMIT_PARAGRAPH:
-          updateParagraph(conn, userAndRoles, notebook, messagereceived);
+          updateParagraph(conn, messagereceived);
           break;
         case RUN_PARAGRAPH:
-          runParagraph(conn, userAndRoles, notebook, messagereceived);
+          runParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_EXECUTED_BY_SPELL:
           broadcastSpellExecution(conn, userAndRoles, notebook, messagereceived);
           break;
         case RUN_ALL_PARAGRAPHS:
-          runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
+          runAllParagraphs(conn, messagereceived);
           break;
         case CANCEL_PARAGRAPH:
-          cancelParagraph(conn, userAndRoles, notebook, messagereceived);
+          cancelParagraph(conn, messagereceived);
           break;
         case MOVE_PARAGRAPH:
-          moveParagraph(conn, userAndRoles, notebook, messagereceived);
+          moveParagraph(conn, messagereceived);
           break;
         case INSERT_PARAGRAPH:
-          insertParagraph(conn, userAndRoles, notebook, messagereceived);
+          insertParagraph(conn, messagereceived);
           break;
         case COPY_PARAGRAPH:
-          copyParagraph(conn, userAndRoles, notebook, messagereceived);
+          copyParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_REMOVE:
-          removeParagraph(conn, userAndRoles, notebook, messagereceived);
+          removeParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_CLEAR_OUTPUT:
-          clearParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+          clearParagraphOutput(conn, messagereceived);
           break;
         case PARAGRAPH_CLEAR_ALL_OUTPUT:
-          clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+          clearAllParagraphOutput(conn, messagereceived);
           break;
         case NOTE_UPDATE:
-          updateNote(conn, userAndRoles, notebook, messagereceived);
+          updateNote(conn, messagereceived);
           break;
         case NOTE_RENAME:
-          renameNote(conn, userAndRoles, notebook, messagereceived);
+          renameNote(conn, messagereceived);
           break;
         case FOLDER_RENAME:
           renameFolder(conn, userAndRoles, notebook, messagereceived);
@@ -327,7 +341,7 @@ public class NotebookServer extends WebSocketServlet
           updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived);
           break;
         case COMPLETION:
-          completion(conn, userAndRoles, notebook, messagereceived);
+          completion(conn, messagereceived);
           break;
         case PING:
           break; //do nothing
@@ -344,19 +358,19 @@ public class NotebookServer extends WebSocketServlet
           sendAllConfigurations(conn, userAndRoles, notebook);
           break;
         case CHECKPOINT_NOTE:
-          checkpointNote(conn, notebook, messagereceived);
+          checkpointNote(conn, messagereceived);
           break;
         case LIST_REVISION_HISTORY:
-          listRevisionHistory(conn, notebook, messagereceived);
+          listRevisionHistory(conn, messagereceived);
           break;
         case SET_NOTE_REVISION:
-          setNoteRevision(conn, userAndRoles, notebook, messagereceived);
+          setNoteRevision(conn, messagereceived);
           break;
         case NOTE_REVISION:
-          getNoteByRevision(conn, notebook, messagereceived);
+          getNoteByRevision(conn, messagereceived);
           break;
         case NOTE_REVISION_FOR_COMPARE:
-          getNoteByRevisionForCompare(conn, notebook, messagereceived);
+          getNoteByRevisionForCompare(conn, messagereceived);
           break;
         case LIST_NOTE_JOBS:
           unicastNoteJobInfo(conn, messagereceived);
@@ -371,16 +385,16 @@ public class NotebookServer extends WebSocketServlet
           getEditorSetting(conn, messagereceived);
           break;
         case GET_INTERPRETER_SETTINGS:
-          getInterpreterSettings(conn, subject);
+          getInterpreterSettings(conn);
           break;
         case WATCHER:
           switchConnectionToWatcher(conn, messagereceived);
           break;
         case SAVE_NOTE_FORMS:
-          saveNoteForms(conn, userAndRoles, notebook, messagereceived);
+          saveNoteForms(conn, messagereceived);
           break;
         case REMOVE_NOTE_FORMS:
-          removeNoteForms(conn, userAndRoles, notebook, messagereceived);
+          removeNoteForms(conn, messagereceived);
           break;
         case PATCH_PARAGRAPH:
           patchParagraph(conn, userAndRoles, notebook, messagereceived);
@@ -395,14 +409,14 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onClose(NotebookSocket conn, int code, String reason) {
-    LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort(), code, reason);
+    LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
     removeConnectionFromAllNote(conn);
     connectedSockets.remove(conn);
     removeUserConnection(conn.getUser(), conn);
   }
 
   private void removeUserConnection(String user, NotebookSocket conn) {
+    LOG.debug("Remove user connection {} for user: {}", conn, user);
     if (userConnectedSockets.containsKey(user)) {
       userConnectedSockets.get(user).remove(conn);
     } else {
@@ -411,6 +425,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void addUserConnection(String user, NotebookSocket conn) {
+    LOG.debug("Add user connection {} for user: {}", conn, user);
     conn.setUser(user);
     if (userConnectedSockets.containsKey(user)) {
       userConnectedSockets.get(user).add(conn);
@@ -430,6 +445,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void addConnectionToNote(String noteId, NotebookSocket socket) {
+    LOG.debug("Add connection {} to note: {}", socket, noteId);
     synchronized (noteSocketMap) {
       removeConnectionFromAllNote(socket); // make sure a socket relates only a
       // single note.
@@ -446,6 +462,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
+    LOG.debug("Remove connection {} from note: {}", socket, noteId);
     synchronized (noteSocketMap) {
       List<NotebookSocket> socketList = noteSocketMap.get(noteId);
       if (socketList != null) {
@@ -455,7 +472,7 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void removeNote(String noteId) {
+  private void removeConn(String noteId) {
     synchronized (noteSocketMap) {
       List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
     }
@@ -485,7 +502,7 @@ public class NotebookServer extends WebSocketServlet
     message.put("status", collaborativeStatusNew);
     if (collaborativeStatusNew) {
       HashSet<String> userList = new HashSet<>();
-      for (NotebookSocket noteSocket: socketList) {
+      for (NotebookSocket noteSocket : socketList) {
         userList.add(noteSocket.getUser());
       }
       message.put("users", userList);
@@ -626,7 +643,8 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public List<Map<String, String>> generateNotesInfo(boolean needsReload,
-          AuthenticationInfo subject, Set<String> userAndRoles) {
+                                                     AuthenticationInfo subject,
+                                                     Set<String> userAndRoles) {
     Notebook notebook = notebook();
 
     ZeppelinConfiguration conf = notebook.getConf();
@@ -690,7 +708,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
-          Paragraph defaultParagraph) {
+                                  Paragraph defaultParagraph) {
     if (null != userParagraphMap) {
       for (String user : userParagraphMap.keySet()) {
         multicastToUser(user,
@@ -706,7 +724,7 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
   }
 
-  public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
+  public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
     if (subject == null) {
       subject = new AuthenticationInfo(StringUtils.EMPTY);
     }
@@ -717,10 +735,16 @@ public class NotebookServer extends WebSocketServlet
     broadcastNoteListExcept(notesInfo, subject);
   }
 
-  public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject,
-          HashSet<String> userAndRoles) {
-    List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
-    unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+  public void listNotes(NotebookSocket conn, Message message) throws IOException {
+    getNotebookService().listNotes(false, getServiceContext(message),
+        new WebSocketServiceCallback<List<Map<String, String>>>(conn) {
+          @Override
+          public void onSuccess(List<Map<String, String>> notesInfo,
+                                ServiceContext context) throws IOException {
+            super.onSuccess(notesInfo, context);
+            unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+          }
+        });
   }
 
   public void broadcastReloadedNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@@ -736,7 +760,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
-          AuthenticationInfo subject) {
+                                       AuthenticationInfo subject) {
     Set<String> userAndRoles;
     NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
     for (String user : userConnectedSockets.keySet()) {
@@ -752,7 +776,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles,
-          Set<String> allowed) throws IOException {
+                       Set<String> allowed) throws IOException {
     LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed);
 
     conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info",
@@ -761,32 +785,18 @@ public class NotebookServer extends WebSocketServlet
             .toString())));
   }
 
-  /**
-   * @return false if user doesn't have reader permission for this paragraph
-   */
-  private boolean hasParagraphReaderPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
-    NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
-    if (!notebookAuthorization.isReader(noteId, userAndRoles)) {
-      permissionError(conn, op, principal, userAndRoles,
-          notebookAuthorization.getOwners(noteId));
-      return false;
-    }
-
-    return true;
-  }
 
   /**
    * @return false if user doesn't have runner permission for this paragraph
    */
   private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
+                                               String noteId, HashSet<String> userAndRoles,
+                                               String principal, String op)
+      throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isRunner(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
-              notebookAuthorization.getOwners(noteId));
+          notebookAuthorization.getOwners(noteId));
       return false;
     }
 
@@ -797,8 +807,9 @@ public class NotebookServer extends WebSocketServlet
    * @return false if user doesn't have writer permission for this paragraph
    */
   private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
+                                               String noteId, HashSet<String> userAndRoles,
+                                               String principal, String op)
+      throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isWriter(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
@@ -813,7 +824,8 @@ public class NotebookServer extends WebSocketServlet
    * @return false if user doesn't have owner permission for this paragraph
    */
   private boolean hasParagraphOwnerPermission(NotebookSocket conn, Notebook notebook, String noteId,
-          HashSet<String> userAndRoles, String principal, String op) throws IOException {
+                                              HashSet<String> userAndRoles, String principal,
+                                              String op) throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isOwner(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
@@ -824,66 +836,45 @@ public class NotebookServer extends WebSocketServlet
     return true;
   }
 
-  private void sendNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort(), fromMessage.principal, fromMessage.op,
-        fromMessage.get("id"));
-
+  private void getNote(NotebookSocket conn,
+                       Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
     }
-
-    String user = fromMessage.principal;
-
-    Note note = notebook.getNote(noteId);
-
-    if (note != null) {
-      if (!hasParagraphReaderPermission(conn, notebook, noteId,
-          userAndRoles, fromMessage.principal, "read")) {
-        return;
-      }
-
-      addConnectionToNote(note.getId(), conn);
-
-      if (note.isPersonalizedMode()) {
-        note = note.getUserNote(user);
-      }
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
-      sendAllAngularObjects(note, user, conn);
-    } else {
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
-    }
+    getNotebookService().getNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            addConnectionToNote(note.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+            sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+          }
+        });
   }
 
-  private void sendHomeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN);
-    String user = fromMessage.principal;
-
-    Note note = null;
-    if (noteId != null) {
-      note = notebook.getNote(noteId);
-    }
-
-    if (note != null) {
-      if (!hasParagraphReaderPermission(conn, notebook, noteId,
-          userAndRoles, fromMessage.principal, "read")) {
-        return;
-      }
+  private void getHomeNote(NotebookSocket conn,
+                           Message fromMessage) throws IOException {
 
-      addConnectionToNote(note.getId(), conn);
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
-      sendAllAngularObjects(note, user, conn);
-    } else {
-      removeConnectionFromAllNote(conn);
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
-    }
+    getNotebookService().getHomeNote(getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            if (note != null) {
+              addConnectionToNote(note.getId(), conn);
+              conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+              sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+            } else {
+              removeConnectionFromAllNote(conn);
+              conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
+            }
+          }
+        });
   }
 
-  private void updateNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void updateNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String name = (String) fromMessage.get("name");
     Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
@@ -894,35 +885,20 @@ public class NotebookServer extends WebSocketServlet
       return;
     }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) {
-        if (config.get("cron") != null) {
-          config.remove("cron");
-        }
-      }
-      boolean cronUpdated = isCronUpdated(config, note.getConfig());
-      note.setName(name);
-      note.setConfig(config);
-      if (cronUpdated) {
-        notebook.refreshCron(note.getId());
-      }
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name).put("config", config)
-          .put("info", note.getInfo()));
-      broadcastNoteList(subject, userAndRoles);
-    }
+    getNotebookService().updateNote(noteId, name, config, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+                .put("config", config)
+                .put("info", note.getInfo()));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void updatePersonalizedMode(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+                                      Notebook notebook, Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String personalized = (String) fromMessage.get("personalized");
     boolean isPersonalized = personalized.equals("true") ? true : false;
@@ -945,44 +921,31 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    renameNote(conn, userAndRoles, notebook, fromMessage, "rename");
-  }
-
-  private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage, String op) throws IOException {
+  private void renameNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String name = (String) fromMessage.get("name");
-
     if (noteId == null) {
       return;
     }
-
-    if (!hasParagraphOwnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "rename")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.setName(name);
-      note.setCronSupported(notebook.getConf());
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNote(note);
-      broadcastNoteList(subject, userAndRoles);
-    }
+    getNotebookService().renameNote(noteId, name, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            broadcastNote(note);
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+                            Message fromMessage) throws IOException {
     renameFolder(conn, userAndRoles, notebook, fromMessage, "rename");
   }
 
   private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage, String op) throws IOException {
+                            Message fromMessage, String op) throws IOException {
     String oldFolderId = (String) fromMessage.get("id");
     String newFolderId = (String) fromMessage.get("name");
 
@@ -1013,76 +976,50 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
-    boolean cronUpdated = false;
-    if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron")
-        .equals(configB.get("cron"))) {
-      cronUpdated = true;
-    } else if (configA.get("cron") == null && configB.get("cron") == null) {
-      cronUpdated = false;
-    } else if (configA.get("cron") != null || configB.get("cron") != null) {
-      cronUpdated = true;
-    }
-
-    return cronUpdated;
-  }
-
-  private void createNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message message) throws IOException {
-    AuthenticationInfo subject = new AuthenticationInfo(message.principal);
+  private void createNote(NotebookSocket conn,
+                          Message message) throws IOException {
 
-    try {
-      Note note = null;
-      String defaultInterpreterSettingId = (String) message.get("defaultInterpreterGroup");
-      if (defaultInterpreterSettingId != null) {
-        note = notebook.createNote(defaultInterpreterSettingId, subject);
-      } else {
-        note = notebook.createNote(subject);
-      }
-
-      note.addNewParagraph(subject); // it's an empty note. so add one paragraph
-      if (message != null) {
-        String noteName = (String) message.get("name");
-        if (StringUtils.isEmpty(noteName)) {
-          noteName = "Note " + note.getId();
-        }
-        note.setName(noteName);
-        note.setCronSupported(notebook.getConf());
-      }
+    String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup");
+    String noteName = (String) message.get("name");
+    getNotebookService().createNote(defaultInterpreterGroup, noteName, getServiceContext(message),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            addConnectionToNote(note.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
 
-      note.persist(subject);
-      addConnectionToNote(note.getId(), conn);
-      conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
-    } catch (IOException e) {
-      LOG.error("Exception from createNote", e);
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Oops! There is something wrong with the notebook file system. "
-              + "Please check the logs for more details.")));
-      return;
-    }
-    broadcastNoteList(subject, userAndRoles);
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            super.onFailure(ex, context);
+            conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+                "Oops! There is something wrong with the notebook file system. "
+                    + "Please check the logs for more details.")));
+          }
+        });
   }
 
-  private void removeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void deleteNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
     }
-
-    if (!hasParagraphOwnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "remove")) {
-      return;
-    }
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    notebook.removeNote(noteId, subject);
-    removeNote(noteId);
-    broadcastNoteList(subject, userAndRoles);
+    getNotebookService().removeNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<String>(conn) {
+          @Override
+          public void onSuccess(String message, ServiceContext context) throws IOException {
+            super.onSuccess(message, context);
+            removeConn(noteId);
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void removeFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+                            Message fromMessage) throws IOException {
     String folderId = (String) fromMessage.get("id");
     if (folderId == null) {
       return;
@@ -1101,13 +1038,13 @@ public class NotebookServer extends WebSocketServlet
     AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
     for (Note note : notes) {
       notebook.removeNote(note.getId(), subject);
-      removeNote(note.getId());
+      removeConn(note.getId());
     }
     broadcastNoteList(subject, userAndRoles);
   }
 
   private void moveNoteToTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                               Message fromMessage) throws SchedulerException, IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
@@ -1121,15 +1058,16 @@ public class NotebookServer extends WebSocketServlet
       notebook.removeCron(note.getId());
     }
 
-    if (note != null && !note.isTrash()){
+    if (note != null && !note.isTrash()) {
       fromMessage.put("name", Folder.TRASH_FOLDER_ID + "/" + note.getName());
-      renameNote(conn, userAndRoles, notebook, fromMessage, "move");
+      renameNote(conn, fromMessage);
       notebook.moveNoteToTrash(note.getId());
     }
   }
 
   private void moveFolderToTrash(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws SchedulerException, IOException {
+                                 Notebook notebook, Message fromMessage)
+      throws SchedulerException, IOException {
     String folderId = (String) fromMessage.get("id");
     if (folderId == null) {
       return;
@@ -1138,14 +1076,14 @@ public class NotebookServer extends WebSocketServlet
     Folder folder = notebook.getFolder(folderId);
     if (folder != null && !folder.isTrash()) {
       String trashFolderId = Folder.TRASH_FOLDER_ID + "/" + folderId;
-      if (notebook.hasFolder(trashFolderId)){
+      if (notebook.hasFolder(trashFolderId)) {
         DateTime currentDate = new DateTime();
         DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
         trashFolderId += Folder.TRASH_FOLDER_CONFLICT_INFIX + formatter.print(currentDate);
       }
 
       List<Note> noteList = folder.getNotesRecursively();
-      for (Note note: noteList) {
+      for (Note note : noteList) {
         Map<String, Object> config = note.getConfig();
         if (config.get("cron") != null) {
           notebook.removeCron(note.getId());
@@ -1157,30 +1095,20 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void restoreNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+  private void restoreNote(NotebookSocket conn,
+                           Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
-
     if (noteId == null) {
       return;
     }
 
-    Note note = notebook.getNote(noteId);
-
-    //restore cron
-    Map<String, Object> config = note.getConfig();
-    if (config.get("cron") != null) {
-      notebook.refreshCron(note.getId());
-    }
+    getNotebookService().restoreNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn));
 
-    if (note != null && note.isTrash()) {
-      fromMessage.put("name", note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", ""));
-      renameNote(conn, userAndRoles, notebook, fromMessage, "restore");
-    }
   }
 
   private void restoreFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                             Message fromMessage) throws SchedulerException, IOException {
     String folderId = (String) fromMessage.get("id");
 
     if (folderId == null) {
@@ -1211,7 +1139,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void restoreAll(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                          Message fromMessage) throws IOException {
     Folder trashFolder = notebook.getFolder(Folder.TRASH_FOLDER_ID);
     if (trashFolder != null) {
       fromMessage.data = new HashMap<>();
@@ -1222,57 +1150,42 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void emptyTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                          Message fromMessage) throws SchedulerException, IOException {
     fromMessage.data = new HashMap<>();
     fromMessage.put("id", Folder.TRASH_FOLDER_ID);
     removeFolder(conn, userAndRoles, notebook, fromMessage);
   }
 
-  private void updateParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void updateParagraph(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
-
-    Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
-    Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
     String noteId = getOpenNoteId(conn);
     if (noteId == null) {
       noteId = (String) fromMessage.get("noteId");
     }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    Paragraph p = note.getParagraph(paragraphId);
-
-    p.settings.setParams(params);
-    p.setConfig(config);
-    p.setTitle((String) fromMessage.get("title"));
-    p.setText((String) fromMessage.get("paragraph"));
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    if (note.isPersonalizedMode()) {
-      p = p.getUserParagraph(subject.getUser());
-      p.settings.setParams(params);
-      p.setConfig(config);
-      p.setTitle((String) fromMessage.get("title"));
-      p.setText((String) fromMessage.get("paragraph"));
-    }
-
-    note.persist(subject);
+    String title = (String) fromMessage.get("title");
+    String text = (String) fromMessage.get("paragraph");
+    Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+    Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
 
-    if (note.isPersonalizedMode()) {
-      Map<String, Paragraph> userParagraphMap =
-          note.getParagraph(paragraphId).getUserParagraphMap();
-      broadcastParagraphs(userParagraphMap, p);
-    } else {
-      broadcastParagraph(note, p);
-    }
+    getNotebookService().updateParagraph(noteId, paragraphId, title, text, params, config,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            if (p.getNote().isPersonalizedMode()) {
+              Map<String, Paragraph> userParagraphMap =
+                  p.getNote().getParagraph(paragraphId).getUserParagraphMap();
+              broadcastParagraphs(userParagraphMap, p);
+            } else {
+              broadcastParagraph(p.getNote(), p);
+            }
+          }
+        });
   }
 
   private void patchParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
@@ -1327,142 +1240,140 @@ public class NotebookServer extends WebSocketServlet
     paragraphText = (String) dmp.patchApply(patches, paragraphText)[0];
     p.setText(paragraphText);
     Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText)
-                                                     .put("paragraphId", p.getId());
+        .put("paragraphId", p.getId());
     broadcastExcept(note.getId(), message, conn);
   }
 
-  private void cloneNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void cloneNote(NotebookSocket conn,
+                         Message fromMessage) throws IOException {
+
     String noteId = getOpenNoteId(conn);
     String name = (String) fromMessage.get("name");
-    Note newNote = notebook.cloneNote(noteId, name, new AuthenticationInfo(fromMessage.principal));
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    addConnectionToNote(newNote.getId(), (NotebookSocket) conn);
-    conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
-    broadcastNoteList(subject, userAndRoles);
+    getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+            super.onSuccess(newNote, context);
+            addConnectionToNote(newNote.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
-  private void clearAllParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void clearAllParagraphOutput(NotebookSocket conn,
+                                       Message fromMessage) throws IOException {
     final String noteId = (String) fromMessage.get("id");
     if (StringUtils.isBlank(noteId)) {
       return;
     }
+    getNotebookService().clearAllParagraphOutput(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            broadcastNote(note);
+          }
+        });
+  }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "clear output")) {
-      return;
-    }
+  protected Note importNote(NotebookSocket conn, Message fromMessage) throws IOException {
+    String noteName = (String) ((Map) fromMessage.get("note")).get("name");
+    String noteJson = gson.toJson(fromMessage.get("note"));
+    Note note = getNotebookService().importNote(noteName, noteJson, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            try {
+              broadcastNote(note);
+              broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+            } catch (NullPointerException e) {
+              // TODO(zjffdu) remove this try catch. This is only for test of
+              // NotebookServerTest#testImportNotebook
+            }
+          }
+        });
 
-    Note note = notebook.getNote(noteId);
-    note.clearAllParagraphOutput();
-    broadcastNote(note);
-  }
-
-  protected Note importNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    Note note = null;
-    if (fromMessage != null) {
-      String noteName = (String) ((Map) fromMessage.get("note")).get("name");
-      String noteJson = gson.toJson(fromMessage.get("note"));
-      AuthenticationInfo subject;
-      if (fromMessage.principal != null) {
-        subject = new AuthenticationInfo(fromMessage.principal);
-      } else {
-        subject = new AuthenticationInfo("anonymous");
-      }
-      note = notebook.importNote(noteJson, noteName, subject);
-      note.persist(subject);
-      broadcastNote(note);
-      broadcastNoteList(subject, userAndRoles);
-    }
     return note;
   }
 
-  private void removeParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void removeParagraph(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-
-    // Don't allow removing paragraph when there is only one paragraph in the Notebook
-    if (note.getParagraphCount() > 1) {
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      Paragraph para = note.removeParagraph(subject.getUser(), paragraphId);
-      note.persist(subject);
-      if (para != null) {
-        broadcast(note.getId(), new Message(OP.PARAGRAPH_REMOVED).
-            put("id", para.getId()));
-      }
-    }
+    getNotebookService().removeParagraph(noteId, paragraphId,
+        getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+                put("id", p.getId()));
+          }
+        });
   }
 
-  private void clearParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void clearParagraphOutput(NotebookSocket conn,
+                                    Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
-
     String noteId = getOpenNoteId(conn);
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    if (note.isPersonalizedMode()) {
-      String user = fromMessage.principal;
-      Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
-      unicastParagraph(note, p, user);
-    } else {
-      note.clearParagraphOutput(paragraphId);
-      Paragraph paragraph = note.getParagraph(paragraphId);
-      broadcastParagraph(note, paragraph);
-    }
+    getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            if (p.getNote().isPersonalizedMode()) {
+              unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+            } else {
+              broadcastParagraph(p.getNote(), p);
+            }
+          }
+        });
   }
 
-  private void completion(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void completion(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
+    String noteId = getOpenNoteId(conn);
     String paragraphId = (String) fromMessage.get("id");
     String buffer = (String) fromMessage.get("buf");
     int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
-    Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
-    if (paragraphId == null) {
-      conn.send(serializeMessage(resp));
-      return;
-    }
+    getNotebookService().completion(noteId, paragraphId, buffer, cursor,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) {
+          @Override
+          public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context)
+              throws IOException {
+            super.onSuccess(completions, context);
+            Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+            resp.put("completions", completions);
+            conn.send(serializeMessage(resp));
+          }
 
-    final Note note = notebook.getNote(getOpenNoteId(conn));
-    List<InterpreterCompletion> candidates;
-    try {
-      candidates = note.completion(paragraphId, buffer, cursor);
-    } catch (RuntimeException e) {
-      LOG.info("Fail to get completion", e);
-      candidates = new ArrayList<>();
-    }
-    resp.put("completions", candidates);
-    conn.send(serializeMessage(resp));
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            super.onFailure(ex, context);
+            Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+            resp.put("completions", new ArrayList<>());
+            conn.send(serializeMessage(resp));
+          }
+        });
   }
 
   /**
    * When angular object updated from client.
    *
-   * @param conn the web socket.
-   * @param notebook the notebook.
+   * @param conn        the web socket.
+   * @param notebook    the notebook.
    * @param fromMessage the message.
    */
   private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) {
+                                    Notebook notebook, Message fromMessage) {
     String noteId = (String) fromMessage.get("noteId");
     String paragraphId = (String) fromMessage.get("paragraphId");
     String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
@@ -1547,7 +1458,7 @@ public class NotebookServer extends WebSocketServlet
    * and a paragraph id.
    */
   protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws Exception {
+                                         Notebook notebook, Message fromMessage) throws Exception {
     String noteId = fromMessage.getType("noteId");
     String varName = fromMessage.getType("name");
     Object varValue = fromMessage.get("value");
@@ -1581,7 +1492,8 @@ public class NotebookServer extends WebSocketServlet
    * and an optional list of paragraph id(s).
    */
   protected void angularObjectClientUnbind(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws Exception {
+                                           Notebook notebook, Message fromMessage)
+      throws Exception {
     String noteId = fromMessage.getType("noteId");
     String varName = fromMessage.getType("name");
     String paragraphId = fromMessage.getType("paragraphId");
@@ -1609,7 +1521,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId)
-          throws Exception {
+      throws Exception {
     final Paragraph paragraph = note.getParagraph(paragraphId);
     if (paragraph == null) {
       throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId);
@@ -1618,8 +1530,10 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName,
-          Object varValue, RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                                 Object varValue,
+                                                 RemoteAngularObjectRegistry remoteRegistry,
+                                                 String interpreterGroupId,
+                                                 NotebookSocket conn) {
     final AngularObject ao =
         remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
 
@@ -1629,8 +1543,9 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName,
-          RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                               RemoteAngularObjectRegistry remoteRegistry,
+                                               String interpreterGroupId,
+                                               NotebookSocket conn) {
     final AngularObject ao =
         remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
     this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao)
@@ -1639,8 +1554,9 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName,
-          Object varValue, AngularObjectRegistry registry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                            Object varValue, AngularObjectRegistry registry,
+                                            String interpreterGroupId,
+                                            NotebookSocket conn) {
     AngularObject angularObject = registry.get(varName, noteId, paragraphId);
     if (angularObject == null) {
       angularObject = registry.add(varName, varValue, noteId, paragraphId);
@@ -1655,7 +1571,8 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName,
-          AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) {
+                                                AngularObjectRegistry registry,
+                                                String interpreterGroupId, NotebookSocket conn) {
     final AngularObject removed = registry.remove(varName, noteId, paragraphId);
     if (removed != null) {
       this.broadcastExcept(noteId,
@@ -1665,8 +1582,8 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void moveParagraph(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
@@ -1674,31 +1591,22 @@ public class NotebookServer extends WebSocketServlet
 
     final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
     String noteId = getOpenNoteId(conn);
-    final Note note = notebook.getNote(noteId);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    note.moveParagraph(paragraphId, newIndex);
-    note.persist(subject);
-    broadcast(note.getId(),
-        new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+    getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+            super.onSuccess(result, context);
+            broadcast(result.getNote().getId(),
+                new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+          }
+        });
   }
 
-  private String insertParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private String insertParagraph(NotebookSocket conn,
+                                 Message fromMessage) throws IOException {
     final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
     String noteId = getOpenNoteId(conn);
-    final Note note = notebook.getNote(noteId);
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return null;
-    }
     Map<String, Object> config;
     if (fromMessage.get("config") != null) {
       config = (Map<String, Object>) fromMessage.get("config");
@@ -1706,85 +1614,59 @@ public class NotebookServer extends WebSocketServlet
       config = new HashMap<>();
     }
 
-    Paragraph newPara = note.insertNewParagraph(index, subject);
-    newPara.setConfig(config);
-    note.persist(subject);
-    broadcastNewParagraph(note, newPara);
+    Paragraph newPara = getNotebookService().insertParagraph(noteId, index, config,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            broadcastNewParagraph(p.getNote(), p);
+          }
+        });
 
     return newPara.getId();
   }
 
-  private void copyParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    String newParaId = insertParagraph(conn, userAndRoles, notebook, fromMessage);
+  private void copyParagraph(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
+    String newParaId = insertParagraph(conn, fromMessage);
 
     if (newParaId == null) {
       return;
     }
     fromMessage.put("id", newParaId);
 
-    updateParagraph(conn, userAndRoles, notebook, fromMessage);
+    updateParagraph(conn, fromMessage);
   }
 
-  private void cancelParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
 
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    Paragraph p = note.getParagraph(paragraphId);
-    p.abort();
+    getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<>(conn));
   }
 
-  private void runAllParagraphs(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void runAllParagraphs(NotebookSocket conn,
+                                Message fromMessage) throws IOException {
     final String noteId = (String) fromMessage.get("noteId");
     if (StringUtils.isBlank(noteId)) {
       return;
     }
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "run all paragraphs")) {
-      return;
-    }
-
     List<Map<String, Object>> paragraphs =
         gson.fromJson(String.valueOf(fromMessage.data.get("paragraphs")),
-            new TypeToken<List<Map<String, Object>>>() {}.getType());
-
-    for (Map<String, Object> raw : paragraphs) {
-      String paragraphId = (String) raw.get("id");
-      if (paragraphId == null) {
-        continue;
-      }
-
-      String text = (String) raw.get("paragraph");
-      String title = (String) raw.get("title");
-      Map<String, Object> params = (Map<String, Object>) raw.get("params");
-      Map<String, Object> config = (Map<String, Object>) raw.get("config");
-
-      Note note = notebook.getNote(noteId);
-      Paragraph p = setParagraphUsingMessage(note, fromMessage,
-          paragraphId, text, title, params, config);
+            new TypeToken<List<Map<String, Object>>>() {
+            }.getType());
 
-      if (p.isEnabled() && !persistAndExecuteSingleParagraph(conn, note, p, true)) {
-        // stop execution when one paragraph fails.
-        break;
-      }
-    }
+    getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn));
   }
 
   private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+                                       Notebook notebook, Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
@@ -1837,39 +1719,40 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
   }
 
-  private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void runParagraph(NotebookSocket conn,
+                            Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
+      //TODO(zjffdu) it is possible ?
       return;
     }
 
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    // 1. clear paragraph only if personalized,
-    // otherwise this will be handed in `onOutputClear`
-    final Note note = notebook.getNote(noteId);
-    if (note.isPersonalizedMode()) {
-      String user = fromMessage.principal;
-      Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
-      unicastParagraph(note, p, user);
-    }
-
-    // 2. set paragraph values
     String text = (String) fromMessage.get("paragraph");
     String title = (String) fromMessage.get("title");
     Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
     Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+    getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, false,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            if (p.getNote().isPersonalizedMode()) {
+              Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
+                  context.getAutheInfo().getUser());
+              unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+            }
 
-    Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
-        text, title, params, config);
-
-    persistAndExecuteSingleParagraph(conn, note, p, false);
+            // if it's the last paragraph and not empty, let's add a new one
+            boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId);
+            if (!(Strings.isNullOrEmpty(p.getText()) ||
+                Strings.isNullOrEmpty(p.getScriptText())) &&
+                isTheLastParagraph) {
+              Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo());
+              broadcastNewParagraph(p.getNote(), newPara);
+            }
+          }
+        });
   }
 
   private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
@@ -1887,7 +1770,7 @@ public class NotebookServer extends WebSocketServlet
    * @return false if failed to save a note
    */
   private boolean persistNoteWithAuthInfo(NotebookSocket conn, Note note, Paragraph p)
-          throws IOException {
+      throws IOException {
     try {
       note.persist(p.getAuthenticationInfo());
       return true;
@@ -1901,28 +1784,9 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private boolean persistAndExecuteSingleParagraph(NotebookSocket conn, Note note, Paragraph p,
-          boolean blocking) throws IOException {
-    addNewParagraphIfLastParagraphIsExecuted(note, p);
-    if (!persistNoteWithAuthInfo(conn, note, p)) {
-      return false;
-    }
-
-    try {
-      return note.run(p.getId(), blocking);
-    } catch (Exception ex) {
-      LOG.error("Exception from run", ex);
-      if (p != null) {
-        p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex);
-        p.setStatus(Status.ERROR);
-        broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p));
-      }
-      return false;
-    }
-  }
-
   private Paragraph setParagraphUsingMessage(Note note, Message fromMessage, String paragraphId,
-          String text, String title, Map<String, Object> params, Map<String, Object> config) {
+                                             String text, String title, Map<String, Object> params,
+                                             Map<String, Object> config) {
     Paragraph p = note.getParagraph(paragraphId);
     p.setText(text);
     p.setTitle(title);
@@ -1945,7 +1809,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void sendAllConfigurations(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook) throws IOException {
+                                     Notebook notebook) throws IOException {
     ZeppelinConfiguration conf = notebook.getConf();
 
     Map<String, String> configurations =
@@ -1962,97 +1826,94 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.CONFIGURATIONS_INFO).put("configurations", configurations)));
   }
 
-  private void checkpointNote(NotebookSocket conn, Notebook notebook, Message fromMessage)
-          throws IOException {
+  private void checkpointNote(NotebookSocket conn, Message fromMessage)
+      throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String commitMessage = (String) fromMessage.get("commitMessage");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Revision revision = notebook.checkpointNote(noteId, commitMessage, subject);
-    if (!Revision.isEmpty(revision)) {
-      List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
-      conn.send(
-          serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
-    } else {
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
-              + "Please check the logs for more details.")));
-    }
+
+    getNotebookService().checkpointNote(noteId, commitMessage, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Revision>(conn) {
+          @Override
+          public void onSuccess(Revision revision, ServiceContext context) throws IOException {
+            super.onSuccess(revision, context);
+            if (!Revision.isEmpty(revision)) {
+              List<Revision> revisions =
+                  notebook().listRevisionHistory(noteId, context.getAutheInfo());
+              conn.send(
+                  serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList",
+                      revisions)));
+            } else {
+              conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+                  "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
+                      + "Please check the logs for more details.")));
+            }
+          }
+        });
   }
 
-  private void listRevisionHistory(NotebookSocket conn, Notebook notebook, Message fromMessage)
-          throws IOException {
+  private void listRevisionHistory(NotebookSocket conn, Message fromMessage)
+      throws IOException {
     String noteId = (String) fromMessage.get("noteId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
-
-    conn.send(
-        serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+    getNotebookService().listRevisionHistory(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<List<Revision>>(conn) {
+          @Override
+          public void onSuccess(List<Revision> revisions, ServiceContext context)
+              throws IOException {
+            super.onSuccess(revisions, context);
+            conn.send(serializeMessage(
+                new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+          }
+        });
   }
 
-  private void setNoteRevision(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void setNoteRevision(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note headNote = null;
-    boolean setRevisionStatus;
-    try {
-      headNote = notebook.setNoteRevision(noteId, revisionId, subject);
-      setRevisionStatus = headNote != null;
-    } catch (Exception e) {
-      setRevisionStatus = false;
-      LOG.error("Failed to set given note revision", e);
-    }
-    if (setRevisionStatus) {
-      notebook.loadNoteFromRepo(noteId, subject);
-    }
-
-    conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", setRevisionStatus)));
-
-    if (setRevisionStatus) {
-      Note reloadedNote = notebook.getNote(headNote.getId());
-      broadcastNote(reloadedNote);
-    } else {
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Couldn't set note to the given revision. "
-              + "Please check the logs for more details.")));
-    }
+    getNotebookService().setNoteRevision(noteId, revisionId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo());
+            conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true)));
+            broadcastNote(reloadedNote);
+          }
+        });
   }
 
-  private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message fromMessage)
+  private void getNoteByRevision(NotebookSocket conn, Message fromMessage)
       throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
-    conn.send(serializeMessage(
-        new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
-            .put("note", revisionNote)));
+    getNotebookService().getNotebyRevision(noteId, revisionId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            conn.send(serializeMessage(
+                new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
+                    .put("note", note)));
+          }
+        });
   }
 
-  private void getNoteByRevisionForCompare(NotebookSocket conn, Notebook notebook,
-      Message fromMessage) throws IOException {
+  private void getNoteByRevisionForCompare(NotebookSocket conn,
+                                           Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-
     String position = (String) fromMessage.get("position");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Note revisionNote;
-    if (revisionId.equals("Head")) {
-      revisionNote = notebook.getNote(noteId);
-    } else {
-      revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
-    }
-
-    conn.send(serializeMessage(
-        new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
-            .put("revisionId", revisionId).put("position", position).put("note", revisionNote)));
+    getNotebookService().getNoteByRevisionForCompare(noteId, revisionId,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            conn.send(serializeMessage(
+                new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
+                    .put("revisionId", revisionId).put("position", position).put("note", note)));
+          }
+        });
   }
 
   /**
@@ -2074,7 +1935,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputUpdated(String noteId, String paragraphId, int index,
-      InterpreterResult.Type type, String output) {
+                              InterpreterResult.Type type, String output) {
     Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId)
         .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output);
     Note note = notebook().getNote(noteId);
@@ -2105,7 +1966,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputAppend(String noteId, String paragraphId, int index, String appId,
-      String output) {
+                             String output) {
     Message msg =
         new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("appId", appId).put("data", output);
@@ -2117,7 +1978,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputUpdated(String noteId, String paragraphId, int index, String appId,
-      InterpreterResult.Type type, String output) {
+                              InterpreterResult.Type type, String output) {
     Message msg =
         new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("type", type).put("appId", appId).put("data", output);
@@ -2322,7 +2183,7 @@ public class NotebookServer extends WebSocketServlet
         if (job.getStatus() == Status.FINISHED) {
           LOG.info("Job {} is finished successfully, status: {}", job.getId(), job.getStatus());
         } else {
-          LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}" , job.getId(),
+          LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", job.getId(),
               job.getStatus(), job.getException(), job.getReturn());
         }
 
@@ -2484,7 +2345,7 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
+  private void getInterpreterSettings(NotebookSocket conn)
       throws IOException {
     List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get();
     conn.send(serializeMessage(
@@ -2525,7 +2386,7 @@ public class NotebookServer extends WebSocketServlet
 
   private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
     synchronized (connectedSockets) {
-      for (NotebookSocket conn: connectedSockets) {
+      for (NotebookSocket conn : connectedSockets) {
         if (exclude != null && exclude.equals(conn)) {
           continue;
         }
@@ -2555,7 +2416,7 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onParaInfosReceived(String noteId, String paragraphId,
-          String interpreterSettingId, Map<String, String> metaInfos) {
+                                  String interpreterSettingId, Map<String, String> metaInfos) {
     Note note = notebook().getNote(noteId);
     if (note != null) {
       Paragraph paragraph = note.getParagraph(paragraphId);
@@ -2601,53 +2462,39 @@ public class NotebookServer extends WebSocketServlet
     setting.clearNoteIdAndParaMap();
   }
 
-  public void broadcastNoteForms(Note note) {
+  private void broadcastNoteForms(Note note) {
     GUI formsSettings = new GUI();
     formsSettings.setForms(note.getNoteForms());
     formsSettings.setParams(note.getNoteParams());
-
     broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
   }
 
-  private void saveNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void saveNoteForms(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     Map<String, Object> noteParams = (Map<String, Object>) fromMessage.get("noteParams");
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.setNoteParams(noteParams);
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNoteForms(note);
-    }
+    getNotebookService().saveNoteForms(noteId, noteParams, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcastNoteForms(note);
+          }
+        });
   }
 
-  private void removeNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void removeNoteForms(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String formName = (String) fromMessage.get("formName");
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.getNoteForms().remove(formName);
-      note.getNoteParams().remove(formName);
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNoteForms(note);
-    }
+    getNotebookService().removeNoteForms(noteId, formName, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcastNoteForms(note);
+          }
+        });
   }
 
   @Override
@@ -2665,4 +2512,43 @@ public class NotebookServer extends WebSocketServlet
     m.data.put("notice", message);
     broadcast(m);
   }
+
+  private ServiceContext getServiceContext(Message message) {
+    AuthenticationInfo authInfo =
+        new AuthenticationInfo(message.principal, message.roles, message.ticket);
+    Set<String> userAndRoles = new HashSet<>();
+    userAndRoles.add(message.principal);
+    if (message.roles != null && !message.roles.equals("")) {
+      HashSet<String> roles =
+          gson.fromJson(message.roles, new TypeToken<HashSet<String>>() {
+          }.getType());
+      if (roles != null) {
+        userAndRoles.addAll(roles);
+      }
+    }
+    return new ServiceContext(authInfo, userAndRoles);
+  }
+
+  private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
+
+    private NotebookSocket conn;
+
+    WebSocketServiceCallback(NotebookSocket conn) {
+      this.conn = conn;
+    }
+
+    @Override
+    public void onFailure(Exception ex, ServiceContext context) throws IOException {
+      super.onFailure(ex, context);
+      if (ex instanceof ForbiddenException) {
+        Type type = new TypeToken<Map<String, String>>(){}.getType();
+        Map<String, String> jsonObject =
+            gson.fromJson(((ForbiddenException) ex).getResponse().getEntity().toString(), type);
+        conn.send(serializeMessage(new Message(OP.AUTH_INFO)
+            .put("info", jsonObject.get("message"))));
+      } else {
+        conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", ex.getMessage())));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
index 66e4038..65740ff 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -77,4 +77,9 @@ public class NotebookSocket extends WebSocketAdapter {
   public void setUser(String user) {
     this.user = user;
   }
+
+  @Override
+  public String toString() {
+    return request.getRemoteHost() + ":" + request.getRemotePort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
deleted file mode 100644
index ddfe7c4..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-/** This will be used by some services to pass messages to frontend via WebSocket */
-public interface ServiceCallback {
-  void onStart(String message);
-
-  void onSuccess(String message);
-
-  void onFailure(String message);
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index ce867ee..62a3782 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -221,8 +221,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
   @Test
   public void testDeleteNoteBadId() throws IOException {
     LOG.info("testDeleteNoteBadId");
-    testDeleteNote("2AZFXEX97");
-    testDeleteNote("bad_ID");
+    testDeleteNotExistNote("bad_ID");
   }
 
   @Test
@@ -317,6 +316,13 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
     }
   }
 
+  private void testDeleteNotExistNote(String noteId) throws IOException {
+    DeleteMethod delete = httpDelete(("/notebook/" + noteId));
+    LOG.info("testDeleteNote delete response\n" + delete.getResponseBodyAsString());
+    assertThat("Test delete method:", delete, isNotFound());
+    delete.releaseConnection();
+  }
+
   @Test
   public void testCloneNote() throws IOException, IllegalArgumentException {
     LOG.info("testCloneNote");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
index ab74012..cc4b1df 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
@@ -33,7 +33,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
-import org.apache.zeppelin.socket.ServiceCallback;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -120,19 +119,19 @@ public class InterpreterServiceTest {
         new InterpreterInstallationRequest(interpreterName, artifactName),
         dependencyResolver,
         specificInterpreterPath,
-        new ServiceCallback() {
+        new SimpleServiceCallback<String>() {
           @Override
-          public void onStart(String message) {
+          public void onStart(String message, ServiceContext context) {
             assertEquals("Starting to download " + interpreterName + " interpreter", message);
           }
 
           @Override
-          public void onSuccess(String message) {
+          public void onSuccess(String message, ServiceContext context) {
             assertEquals(interpreterName + " downloaded", message);
           }
 
           @Override
-          public void onFailure(String message) {
+          public void onFailure(Exception ex, ServiceContext context) {
             fail();
           }
         });