You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/10 06:54:37 UTC

[1/3] zeppelin git commit: ZEPPELIN-3681. Introduce NotebookService

Repository: zeppelin
Updated Branches:
  refs/heads/master e740efbe0 -> 6beb1bb3c


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 078dbb2..abce8b6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -16,37 +16,6 @@
  */
 package org.apache.zeppelin.socket;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import static java.util.Arrays.asList;
-
-import com.google.gson.Gson;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.List;
-
-import javax.servlet.http.HttpServletRequest;
-
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectBuilder;
@@ -63,6 +32,31 @@ import org.apache.zeppelin.rest.AbstractTestRestApi;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Basic REST API tests for notebookServer.
@@ -70,14 +64,12 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 public class NotebookServerTest extends AbstractTestRestApi {
   private static Notebook notebook;
   private static NotebookServer notebookServer;
-  private static Gson gson;
   private HttpServletRequest mockRequest;
   private AuthenticationInfo anonymous;
 
   @BeforeClass
   public static void init() throws Exception {
     AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName());
-    gson = new Gson();
     notebook = ZeppelinServer.notebook;
     notebookServer = ZeppelinServer.notebookWsServer;
   }
@@ -254,7 +246,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
     Message messageReceived = notebookServer.deserializeMessage(msg);
     Note note = null;
     try {
-      note = notebookServer.importNote(null, null, notebook, messageReceived);
+      note = notebookServer.importNote(null, messageReceived);
     } catch (NullPointerException e) {
       //broadcastNoteList(); failed nothing to worry.
       LOG.error("Exception in NotebookServerTest while testImportNotebook, failed nothing to " +

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 0d76809..e045a59 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -70,9 +70,11 @@ public class InterpreterFactory {
       // first assume group is omitted
       InterpreterSetting setting =
           interpreterSettingManager.getByName(defaultInterpreterSetting);
-      Interpreter interpreter = setting.getInterpreter(user, noteId, replName);
-      if (null != interpreter) {
-        return interpreter;
+      if (setting != null) {
+        Interpreter interpreter = setting.getInterpreter(user, noteId, replName);
+        if (null != interpreter) {
+          return interpreter;
+        }
       }
 
       // then assume interpreter name is omitted

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 3f573b8..b1bff81 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -263,7 +263,11 @@ public class Notebook implements NoteEventListener {
         }
       }
       // add the default interpreter group
-      settings.add(interpreterSettingManager.getByName(note.getDefaultInterpreterGroup()));
+      InterpreterSetting defaultIntpSetting =
+          interpreterSettingManager.getByName(note.getDefaultInterpreterGroup());
+      if (defaultIntpSetting != null) {
+        settings.add(defaultIntpSetting);
+      }
       return new ArrayList<>(settings);
     } else {
       return new LinkedList<>();


[3/3] zeppelin git commit: ZEPPELIN-3681. Introduce NotebookService

Posted by zj...@apache.org.
ZEPPELIN-3681. Introduce NotebookService

### What is this PR for?
This the first phase of refactoring work of ZEPPELIN-3288. The background of ZEPPELIN-3288 is that zeppelin provides 2 kinds of api (rest api and websocket), both of them invoke the same backend logic.  This PR put all the notebook related operation into class NotebookService which called by both NotebookServer & NotebookRestApi.

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3288

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #3119 from zjffdu/ZEPPELIN-3681 and squashes the following commits:

6dda42ef8 [Jeff Zhang] ZEPPELIN-3681. Introduce NotebookService


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/6beb1bb3
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/6beb1bb3
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/6beb1bb3

Branch: refs/heads/master
Commit: 6beb1bb3c36892bb0466b60ec142c5597f6e8f9e
Parents: e740efb
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu Aug 2 17:50:17 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Aug 10 14:54:33 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/rest/InterpreterRestApi.java       |   51 +-
 .../apache/zeppelin/rest/NotebookRestApi.java   |  194 +--
 .../rest/exception/ForbiddenException.java      |   13 +-
 .../rest/exception/NotFoundException.java       |   59 -
 .../rest/exception/NoteNotFoundException.java   |   32 +
 .../exception/ParagraphNotFoundException.java   |   31 +
 .../zeppelin/service/InterpreterService.java    |   16 +-
 .../zeppelin/service/NotebookService.java       |  793 ++++++++++++
 .../zeppelin/service/ServiceCallback.java       |   51 +
 .../apache/zeppelin/service/ServiceContext.java |   45 +
 .../zeppelin/service/SimpleServiceCallback.java |   49 +
 .../apache/zeppelin/socket/NotebookServer.java  | 1150 ++++++++----------
 .../apache/zeppelin/socket/NotebookSocket.java  |    5 +
 .../apache/zeppelin/socket/ServiceCallback.java |   27 -
 .../zeppelin/rest/ZeppelinRestApiTest.java      |   10 +-
 .../service/InterpreterServiceTest.java         |    9 +-
 .../zeppelin/socket/NotebookServerTest.java     |   60 +-
 .../interpreter/InterpreterFactory.java         |    8 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |    6 +-
 19 files changed, 1707 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index b6a39b3..16d39d8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -18,19 +18,30 @@
 package org.apache.zeppelin.rest;
 
 import com.google.common.collect.Maps;
-import javax.validation.constraints.NotNull;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.dep.Repository;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterPropertyType;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
-import org.apache.zeppelin.socket.ServiceCallback;
+import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
+import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest;
+import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
+import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest;
+import org.apache.zeppelin.server.JsonResponse;
+import org.apache.zeppelin.service.InterpreterService;
+import org.apache.zeppelin.service.ServiceContext;
+import org.apache.zeppelin.service.SimpleServiceCallback;
+import org.apache.zeppelin.socket.NotebookServer;
+import org.apache.zeppelin.utils.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.repository.RemoteRepository;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import javax.validation.constraints.NotNull;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -40,21 +51,9 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-
-import org.apache.zeppelin.annotation.ZeppelinApi;
-import org.apache.zeppelin.dep.Repository;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterPropertyType;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
-import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest;
-import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
-import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest;
-import org.apache.zeppelin.server.JsonResponse;
-import org.apache.zeppelin.service.InterpreterService;
-import org.apache.zeppelin.socket.NotebookServer;
-import org.apache.zeppelin.utils.SecurityUtils;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Interpreter Rest API.
@@ -292,9 +291,9 @@ public class InterpreterRestApi {
     try {
       interpreterService.installInterpreter(
           request,
-          new ServiceCallback() {
+          new SimpleServiceCallback<String>() {
             @Override
-            public void onStart(String message) {
+            public void onStart(String message, ServiceContext context) {
               Message m = new Message(OP.INTERPRETER_INSTALL_STARTED);
               Map<String, Object> data = Maps.newHashMap();
               data.put("result", "Starting");
@@ -304,7 +303,7 @@ public class InterpreterRestApi {
             }
 
             @Override
-            public void onSuccess(String message) {
+            public void onSuccess(String message, ServiceContext context) {
               Message m = new Message(OP.INTERPRETER_INSTALL_RESULT);
               Map<String, Object> data = Maps.newHashMap();
               data.put("result", "Succeed");
@@ -314,11 +313,11 @@ public class InterpreterRestApi {
             }
 
             @Override
-            public void onFailure(String message) {
+            public void onFailure(Exception ex, ServiceContext context) {
               Message m = new Message(OP.INTERPRETER_INSTALL_RESULT);
               Map<String, Object> data = Maps.newHashMap();
               data.put("result", "Failed");
-              data.put("message", message);
+              data.put("message", ex.getMessage());
               m.data = data;
               notebookServer.broadcast(m);
             }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index f23ec0e..90647f4 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -29,7 +29,8 @@ import org.apache.zeppelin.notebook.NotebookAuthorization;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.rest.exception.BadRequestException;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
-import org.apache.zeppelin.rest.exception.NotFoundException;
+import org.apache.zeppelin.rest.exception.NoteNotFoundException;
+import org.apache.zeppelin.rest.exception.ParagraphNotFoundException;
 import org.apache.zeppelin.rest.message.CronRequest;
 import org.apache.zeppelin.rest.message.NewNoteRequest;
 import org.apache.zeppelin.rest.message.NewParagraphRequest;
@@ -38,6 +39,9 @@ import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest;
 import org.apache.zeppelin.rest.message.UpdateParagraphRequest;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.server.JsonResponse;
+import org.apache.zeppelin.service.NotebookService;
+import org.apache.zeppelin.service.ServiceContext;
+import org.apache.zeppelin.service.SimpleServiceCallback;
 import org.apache.zeppelin.socket.NotebookServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.utils.SecurityUtils;
@@ -53,13 +57,14 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -75,6 +80,7 @@ public class NotebookRestApi {
   private NotebookServer notebookServer;
   private SearchService noteSearchService;
   private NotebookAuthorization notebookAuthorization;
+  private NotebookService notebookService;
 
   public NotebookRestApi() {
   }
@@ -82,6 +88,7 @@ public class NotebookRestApi {
   public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) {
     this.notebook = notebook;
     this.notebookServer = notebookServer;
+    this.notebookService = new NotebookService(notebook);
     this.noteSearchService = search;
     this.notebookAuthorization = notebook.getNotebookAuthorization();
   }
@@ -183,7 +190,7 @@ public class NotebookRestApi {
 
   private void checkIfNoteIsNotNull(Note note) {
     if (note == null) {
-      throw new NotFoundException("note not found");
+      throw new NoteNotFoundException("note not found");
     }
   }
 
@@ -196,7 +203,7 @@ public class NotebookRestApi {
 
   private void checkIfParagraphIsNotNull(Paragraph paragraph) {
     if (paragraph == null) {
-      throw new NotFoundException("paragraph not found");
+      throw new ParagraphNotFoundException("paragraph not found");
     }
   }
 
@@ -276,11 +283,8 @@ public class NotebookRestApi {
   @Path("/")
   @ZeppelinApi
   public Response getNoteList() throws IOException {
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    HashSet<String> userAndRoles = SecurityUtils.getAssociatedRoles();
-    userAndRoles.add(subject.getUser());
-    List<Map<String, String>> notesInfo = notebookServer.generateNotesInfo(false, subject,
-        userAndRoles);
+    List<Map<String, String>> notesInfo = notebookService.listNotes(false, getServiceContext(),
+        new RestServiceCallback<List<Map<String, String>>>());
     return new JsonResponse<>(Status.OK, "", notesInfo).build();
   }
 
@@ -288,10 +292,8 @@ public class NotebookRestApi {
   @Path("{noteId}")
   @ZeppelinApi
   public Response getNote(@PathParam("noteId") String noteId) throws IOException {
-    Note note = notebook.getNote(noteId);
-    checkIfNoteIsNotNull(note);
-    checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this note");
-
+    Note note =
+        notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback<Note>());
     return new JsonResponse<>(Status.OK, "", note).build();
   }
 
@@ -314,17 +316,17 @@ public class NotebookRestApi {
   /**
    * import new note REST API.
    *
-   * @param req - note Json
+   * @param noteJson - note Json
    * @return JSON with new note ID
    * @throws IOException
    */
   @POST
   @Path("import")
   @ZeppelinApi
-  public Response importNote(String req) throws IOException {
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    Note newNote = notebook.importNote(req, null, subject);
-    return new JsonResponse<>(Status.OK, "", newNote.getId()).build();
+  public Response importNote(String noteJson) throws IOException {
+    Note note = notebookService.importNote(null, noteJson, getServiceContext(),
+        new RestServiceCallback<Note>());
+    return new JsonResponse<>(Status.OK, "", note.getId()).build();
   }
 
   /**
@@ -378,16 +380,15 @@ public class NotebookRestApi {
   @ZeppelinApi
   public Response deleteNote(@PathParam("noteId") String noteId) throws IOException {
     LOG.info("Delete note {} ", noteId);
-    checkIfUserIsOwner(noteId, "Insufficient privileges you cannot delete this note");
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    if (!(noteId.isEmpty())) {
-      Note note = notebook.getNote(noteId);
-      if (note != null) {
-        notebook.removeNote(noteId, subject);
-      }
-    }
+    notebookService.removeNote(noteId,
+        getServiceContext(),
+        new RestServiceCallback<String>() {
+          @Override
+          public void onSuccess(String message, ServiceContext context) {
+            notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
 
-    notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles());
     return new JsonResponse<>(Status.OK, "").build();
   }
 
@@ -413,9 +414,14 @@ public class NotebookRestApi {
       newNoteName = request.getName();
     }
     AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    Note newNote = notebook.cloneNote(noteId, newNoteName, subject);
-    notebookServer.broadcastNote(newNote);
-    notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles());
+    Note newNote = notebookService.cloneNote(noteId, newNoteName, getServiceContext(),
+        new SimpleServiceCallback<Note>(){
+          @Override
+          public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+            notebookServer.broadcastNote(newNote);
+            notebookServer.broadcastNoteList(subject, context.getUserAndRoles());
+          }
+        });
     return new JsonResponse<>(Status.OK, "", newNote.getId()).build();
   }
 
@@ -433,21 +439,20 @@ public class NotebookRestApi {
                              String message) throws IOException {
     LOG.info("rename note by JSON {}", message);
     RenameNoteRequest request = gson.fromJson(message, RenameNoteRequest.class);
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-
-    checkIfUserCanWrite(noteId, "Insufficient privileges you cannot rename this note");
-    Note note = notebook.getNote(noteId);
-    checkIfNoteIsNotNull(note);
 
     String newName = request.getName();
     if (newName.isEmpty()) {
       LOG.warn("Trying to rename notebook {} with empty name parameter", noteId);
       throw new BadRequestException("name can not be empty");
     }
-    note.setName(newName);
-
-    notebookServer.broadcastNote(note);
-    notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles());
+    notebookService.renameNote(noteId, request.getName(), getServiceContext(),
+        new RestServiceCallback<Note>(){
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            notebookServer.broadcastNote(note);
+            notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
     return new JsonResponse(Status.OK, "").build();
   }
 
@@ -462,7 +467,7 @@ public class NotebookRestApi {
   @Path("{noteId}/paragraph")
   @ZeppelinApi
   public Response insertParagraph(@PathParam("noteId") String noteId, String message)
-          throws IOException {
+      throws IOException {
     String user = SecurityUtils.getPrincipal();
     LOG.info("insert paragraph {} {}", noteId, message);
 
@@ -513,7 +518,6 @@ public class NotebookRestApi {
    *
    * @param message json containing the "text" and optionally the "title" of the paragraph, e.g.
    *                {"text" : "updated text", "title" : "Updated title" }
-   *
    */
   @PUT
   @Path("{noteId}/paragraph/{paragraphId}")
@@ -585,20 +589,16 @@ public class NotebookRestApi {
     checkIfNoteIsNotNull(note);
     checkIfUserCanWrite(noteId, "Insufficient privileges you cannot move paragraph");
 
-    Paragraph p = note.getParagraph(paragraphId);
-    checkIfParagraphIsNotNull(p);
+    notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex),
+        getServiceContext(),
+        new RestServiceCallback<Paragraph>() {
+          @Override
+          public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+            notebookServer.broadcastNote(result.getNote());
+          }
+        });
+    return new JsonResponse(Status.OK, "").build();
 
-    try {
-      note.moveParagraph(paragraphId, Integer.parseInt(newIndex), true);
-
-      AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-      note.persist(subject);
-      notebookServer.broadcastNote(note);
-      return new JsonResponse(Status.OK, "").build();
-    } catch (IndexOutOfBoundsException e) {
-      LOG.error("Exception in NotebookRestApi while moveParagraph ", e);
-      return new JsonResponse(Status.BAD_REQUEST, "paragraph's new index is out of bound").build();
-    }
   }
 
   /**
@@ -615,18 +615,13 @@ public class NotebookRestApi {
                                   @PathParam("paragraphId") String paragraphId) throws IOException {
     LOG.info("delete paragraph {} {}", noteId, paragraphId);
 
-    Note note = notebook.getNote(noteId);
-    checkIfNoteIsNotNull(note);
-    checkIfUserCanRead(noteId,
-        "Insufficient privileges you cannot remove paragraph from this note");
-
-    Paragraph p = note.getParagraph(paragraphId);
-    checkIfParagraphIsNotNull(p);
-
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    note.removeParagraph(SecurityUtils.getPrincipal(), paragraphId);
-    note.persist(subject);
-    notebookServer.broadcastNote(note);
+    notebookService.removeParagraph(noteId, paragraphId, getServiceContext(),
+        new RestServiceCallback<Paragraph>() {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            notebookServer.broadcastNote(p.getNote());
+          }
+        });
 
     return new JsonResponse(Status.OK, "").build();
   }
@@ -665,7 +660,7 @@ public class NotebookRestApi {
   @ZeppelinApi
   public Response runNoteJobs(@PathParam("noteId") String noteId,
                               @QueryParam("waitToFinish") Boolean waitToFinish)
-          throws IOException, IllegalArgumentException {
+      throws IOException, IllegalArgumentException {
     boolean blocking = waitToFinish == null || waitToFinish;
     LOG.info("run note jobs {} waitToFinish: {}", noteId, blocking);
     Note note = notebook.getNote(noteId);
@@ -775,22 +770,14 @@ public class NotebookRestApi {
       throws IOException, IllegalArgumentException {
     LOG.info("run paragraph job asynchronously {} {} {}", noteId, paragraphId, message);
 
-    Note note = notebook.getNote(noteId);
-    checkIfNoteIsNotNull(note);
-    checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
-    Paragraph paragraph = note.getParagraph(paragraphId);
-    checkIfParagraphIsNotNull(paragraph);
-
-    // handle params if presented
-    handleParagraphParams(message, note, paragraph);
-
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    subject.setRoles(new LinkedList<>(SecurityUtils.getAssociatedRoles()));
-
-    paragraph.setAuthenticationInfo(subject);
-    note.persist(subject);
-
-    note.run(paragraph.getId());
+    Map<String, Object> params = new HashMap<>();
+    if (!StringUtils.isEmpty(message)) {
+      RunParagraphWithParametersRequest request =
+          RunParagraphWithParametersRequest.fromJson(message);
+      params = request.getParams();
+    }
+    notebookService.runParagraph(noteId, paragraphId, "", "", params,
+        new HashMap<String, Object>(), false, getServiceContext(), new RestServiceCallback<>());
     return new JsonResponse<>(Status.OK).build();
   }
 
@@ -854,16 +841,12 @@ public class NotebookRestApi {
   @DELETE
   @Path("job/{noteId}/{paragraphId}")
   @ZeppelinApi
-  public Response stopParagraph(@PathParam("noteId") String noteId,
-                                @PathParam("paragraphId") String paragraphId)
+  public Response cancelParagraph(@PathParam("noteId") String noteId,
+                                  @PathParam("paragraphId") String paragraphId)
       throws IOException, IllegalArgumentException {
     LOG.info("stop paragraph job {} ", noteId);
-    Note note = notebook.getNote(noteId);
-    checkIfNoteIsNotNull(note);
-    checkIfUserCanRun(noteId, "Insufficient privileges you cannot stop paragraph");
-    Paragraph p = note.getParagraph(paragraphId);
-    checkIfParagraphIsNotNull(p);
-    p.abort();
+    notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(),
+        new RestServiceCallback<Paragraph>());
     return new JsonResponse<>(Status.OK).build();
   }
 
@@ -913,9 +896,9 @@ public class NotebookRestApi {
   @Path("cron/{noteId}")
   @ZeppelinApi
   public Response removeCronJob(@PathParam("noteId") String noteId)
-          throws IOException, IllegalArgumentException {
+      throws IOException, IllegalArgumentException {
     LOG.info("Remove cron job note {}", noteId);
-    
+
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note);
     checkIfUserIsOwner(noteId,
@@ -942,7 +925,7 @@ public class NotebookRestApi {
   @Path("cron/{noteId}")
   @ZeppelinApi
   public Response getCronJob(@PathParam("noteId") String noteId)
-          throws IOException, IllegalArgumentException {
+      throws IOException, IllegalArgumentException {
     LOG.info("Get cron job note {}", noteId);
 
     Note note = notebook.getNote(noteId);
@@ -979,7 +962,7 @@ public class NotebookRestApi {
 
   /**
    * Get updated note jobs for job manager
-   *
+   * <p>
    * Return the `Note` change information within the post unix timestamp.
    *
    * @return JSON with status.OK
@@ -1074,4 +1057,25 @@ public class NotebookRestApi {
 
     p.setConfig(origConfig);
   }
+
+  private ServiceContext getServiceContext() {
+    AuthenticationInfo authInfo = new AuthenticationInfo(SecurityUtils.getPrincipal());
+    Set<String> userAndRoles = Sets.newHashSet();
+    userAndRoles.add(SecurityUtils.getPrincipal());
+    userAndRoles.addAll(SecurityUtils.getAssociatedRoles());
+    return new ServiceContext(authInfo, userAndRoles);
+  }
+
+  private static class RestServiceCallback<T> extends SimpleServiceCallback<T> {
+
+    @Override
+    public void onFailure(Exception ex, ServiceContext context) throws IOException {
+      super.onFailure(ex, context);
+      if (ex instanceof WebApplicationException) {
+        throw (WebApplicationException) ex;
+      } else {
+        throw new IOException(ex);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java
index 0c07def..abe08eb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java
@@ -27,21 +27,10 @@ import org.apache.zeppelin.utils.ExceptionUtils;
  * UnauthorizedException handler for WebApplicationException.
  */
 public class ForbiddenException extends WebApplicationException {
-  private static final long serialVersionUID = 4394749068760407567L;
-  private static final String FORBIDDEN_MSG = "Not allowed to access";
-
-  public ForbiddenException() {
-    super(forbiddenJson(FORBIDDEN_MSG));
-  }
-
   private static Response forbiddenJson(String message) {
     return ExceptionUtils.jsonResponseContent(FORBIDDEN, message);
   }
-  
-  public ForbiddenException(Throwable cause, String message) {
-    super(cause, forbiddenJson(message));
-  }
-  
+
   public ForbiddenException(String message) {
     super(forbiddenJson(message));
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java
deleted file mode 100644
index 7f9c17d..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.rest.exception;
-import static javax.ws.rs.core.Response.Status.NOT_FOUND;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-
-import org.apache.zeppelin.utils.ExceptionUtils;
-
-/**
- * Not Found handler for WebApplicationException.
- * 
- */
-public class NotFoundException extends WebApplicationException {
-  private static final long serialVersionUID = 2459398393216512293L;
-
-  /**
-   * Create a HTTP 404 (Not Found) exception.
-   */
-  public NotFoundException() {
-    super(ExceptionUtils.jsonResponse(NOT_FOUND));
-  }
-
-  /**
-   * Create a HTTP 404 (Not Found) exception.
-   * @param message the String that is the entity of the 404 response.
-   */
-  public NotFoundException(String message) {
-    super(notFoundJson(message));
-  }
-
-  private static Response notFoundJson(String message) {
-    return ExceptionUtils.jsonResponseContent(NOT_FOUND, message);
-  }
-
-  public NotFoundException(Throwable cause) {
-    super(cause, notFoundJson(cause.getMessage()));
-  }
-
-  public NotFoundException(Throwable cause, String message) {
-    super(cause, notFoundJson(message));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java
new file mode 100644
index 0000000..773d749
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.exception;
+
+import org.apache.zeppelin.utils.ExceptionUtils;
+
+import javax.ws.rs.WebApplicationException;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+
+public class NoteNotFoundException extends WebApplicationException {
+
+  public NoteNotFoundException(String noteId) {
+    super(ExceptionUtils.jsonResponseContent(NOT_FOUND, "No such note: " + noteId));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java
new file mode 100644
index 0000000..4ec5ee1
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.exception;
+
+import org.apache.zeppelin.utils.ExceptionUtils;
+
+import javax.ws.rs.WebApplicationException;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+
+public class ParagraphNotFoundException extends WebApplicationException {
+
+  public ParagraphNotFoundException(String paragraphId) {
+    super(ExceptionUtils.jsonResponseContent(NOT_FOUND, "No such paragraph: " + paragraphId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java
index a66bc71..930189f 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java
@@ -34,7 +34,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
-import org.apache.zeppelin.socket.ServiceCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.RepositoryException;
@@ -124,11 +123,11 @@ public class InterpreterService {
       InterpreterInstallationRequest request,
       DependencyResolver dependencyResolver,
       Path interpreterDir,
-      ServiceCallback serviceCallback) {
+      ServiceCallback<String> serviceCallback) {
     try {
       logger.info("Start to download a dependency: {}", request.getName());
       if (null != serviceCallback) {
-        serviceCallback.onStart("Starting to download " + request.getName() + " interpreter");
+        serviceCallback.onStart("Starting to download " + request.getName() + " interpreter", null);
       }
 
       dependencyResolver.load(request.getArtifact(), interpreterDir.toFile());
@@ -138,7 +137,7 @@ public class InterpreterService {
           request.getName(),
           interpreterDir.toString());
       if (null != serviceCallback) {
-        serviceCallback.onSuccess(request.getName() + " downloaded");
+        serviceCallback.onSuccess(request.getName() + " downloaded", null);
       }
     } catch (RepositoryException | IOException e) {
       logger.error("Error while downloading dependencies", e);
@@ -151,8 +150,13 @@ public class InterpreterService {
             e1);
       }
       if (null != serviceCallback) {
-        serviceCallback.onFailure(
-            "Error while downloading " + request.getName() + " as " + e.getMessage());
+        try {
+          serviceCallback.onFailure(
+              new Exception("Error while downloading " + request.getName() + " as " +
+                  e.getMessage()), null);
+        } catch (IOException e1) {
+          logger.error("ServiceCallback failure", e1);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
new file mode 100644
index 0000000..401420f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.service;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.notebook.Folder;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.rest.exception.BadRequestException;
+import org.apache.zeppelin.rest.exception.ForbiddenException;
+import org.apache.zeppelin.rest.exception.NoteNotFoundException;
+import org.apache.zeppelin.rest.exception.ParagraphNotFoundException;
+import org.apache.zeppelin.scheduler.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
+
+/**
+ * Service class for Notebook related operations.
+ */
+public class NotebookService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(NotebookService.class);
+
+  private ZeppelinConfiguration zConf;
+  private Notebook notebook;
+  private NotebookAuthorization notebookAuthorization;
+
+  public NotebookService(Notebook notebook) {
+    this.notebook = notebook;
+    this.notebookAuthorization = notebook.getNotebookAuthorization();
+    this.zConf = notebook.getConf();
+  }
+
+  public Note getHomeNote(ServiceContext context,
+                          ServiceCallback<Note> callback) throws IOException {
+    String noteId = notebook.getConf().getString(ZEPPELIN_NOTEBOOK_HOMESCREEN);
+    Note note = null;
+    if (noteId != null) {
+      note = notebook.getNote(noteId);
+      if (note != null) {
+        if (!checkPermission(noteId, Permission.READER, Message.OP.GET_HOME_NOTE, context,
+            callback)) {
+          return null;
+        }
+      } else {
+        callback.onFailure(new Exception("configured HomePage is not existed"), context);
+      }
+    }
+    callback.onSuccess(note, context);
+    return note;
+  }
+
+  public Note getNote(String noteId,
+                      ServiceContext context,
+                      ServiceCallback<Note> callback) throws IOException {
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return null;
+    }
+
+    if (!checkPermission(noteId, Permission.READER, Message.OP.GET_NOTE, context,
+        callback)) {
+      return null;
+    }
+    if (note.isPersonalizedMode()) {
+      note = note.getUserNote(context.getAutheInfo().getUser());
+    }
+    callback.onSuccess(note, context);
+    return note;
+  }
+
+
+  public Note createNote(String defaultInterpreterGroup,
+                         String noteName,
+                         ServiceContext context,
+                         ServiceCallback<Note> callback) throws IOException {
+    if (defaultInterpreterGroup == null) {
+      defaultInterpreterGroup = zConf.getString(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
+    }
+    if (noteName == null) {
+      noteName = "Untitled Note";
+    }
+    try {
+      Note note = notebook.createNote(defaultInterpreterGroup, context.getAutheInfo());
+      note.addNewParagraph(context.getAutheInfo()); // it's an empty note. so add one paragraph
+      note.setName(noteName);
+      note.setCronSupported(notebook.getConf());
+      note.persist(context.getAutheInfo());
+      callback.onSuccess(note, context);
+      return note;
+    } catch (IOException e) {
+      callback.onFailure(new IOException("Fail to create Note", e), context);
+      return null;
+    }
+  }
+
+
+  public void removeNote(String noteId,
+                         ServiceContext context,
+                         ServiceCallback<String> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.OWNER, Message.OP.DEL_NOTE, context, callback)) {
+      return;
+    }
+    if (notebook.getNote(noteId) != null) {
+      notebook.removeNote(noteId, context.getAutheInfo());
+      callback.onSuccess("Delete note successfully", context);
+    } else {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+    }
+  }
+
+  public List<Map<String, String>> listNotes(boolean needsReload,
+                                             ServiceContext context,
+                                             ServiceCallback<List<Map<String, String>>> callback)
+      throws IOException {
+
+    ZeppelinConfiguration conf = notebook.getConf();
+    String homeScreenNoteId = conf.getString(ZEPPELIN_NOTEBOOK_HOMESCREEN);
+    boolean hideHomeScreenNotebookFromList =
+        conf.getBoolean(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE);
+    if (needsReload) {
+      try {
+        notebook.reloadAllNotes(context.getAutheInfo());
+      } catch (IOException e) {
+        LOGGER.error("Fail to reload notes from repository", e);
+      }
+    }
+
+    List<Note> notes = notebook.getAllNotes(context.getUserAndRoles());
+    List<Map<String, String>> notesInfo = new LinkedList<>();
+    for (Note note : notes) {
+      Map<String, String> info = new HashMap<>();
+      if (hideHomeScreenNotebookFromList && note.getId().equals(homeScreenNoteId)) {
+        continue;
+      }
+      info.put("id", note.getId());
+      info.put("name", note.getName());
+      notesInfo.add(info);
+    }
+
+    callback.onSuccess(notesInfo, context);
+    return notesInfo;
+  }
+
+  public void renameNote(String noteId,
+                         String newNoteName,
+                         ServiceContext context,
+                         ServiceCallback<Note> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.NOTE_RENAME, context, callback)) {
+      return;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note != null) {
+      note.setName(newNoteName);
+      note.setCronSupported(notebook.getConf());
+      note.persist(context.getAutheInfo());
+      callback.onSuccess(note, context);
+    } else {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+    }
+
+  }
+
+  public Note cloneNote(String noteId,
+                        String newNoteName,
+                        ServiceContext context,
+                        ServiceCallback<Note> callback) throws IOException {
+    Note newNote = notebook.cloneNote(noteId, newNoteName, context.getAutheInfo());
+    callback.onSuccess(newNote, context);
+    return newNote;
+  }
+
+  public Note importNote(String noteName,
+                         String noteJson,
+                         ServiceContext context,
+                         ServiceCallback<Note> callback) throws IOException {
+    Note note = notebook.importNote(noteJson, noteName, context.getAutheInfo());
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(note, context);
+    return note;
+  }
+
+  public boolean runParagraph(String noteId,
+                              String paragraphId,
+                              String title,
+                              String text,
+                              Map<String, Object> params,
+                              Map<String, Object> config,
+                              boolean isRunAll,
+                              ServiceContext context,
+                              ServiceCallback<Paragraph> callback) throws IOException {
+
+    if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_PARAGRAPH, context, callback)) {
+      return false;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return false;
+    }
+    Paragraph p = note.getParagraph(paragraphId);
+    if (p == null) {
+      callback.onFailure(new ParagraphNotFoundException(paragraphId), context);
+      return false;
+    }
+    if (!p.isEnabled()) {
+      if (!isRunAll) {
+        callback.onFailure(new IOException("paragraph is disabled."), context);
+      }
+      return false;
+    }
+    p.setText(text);
+    p.setTitle(title);
+    p.setAuthenticationInfo(context.getAutheInfo());
+    p.settings.setParams(params);
+    p.setConfig(config);
+
+    if (note.isPersonalizedMode()) {
+      p = note.getParagraph(paragraphId);
+      p.setText(text);
+      p.setTitle(title);
+      p.setAuthenticationInfo(context.getAutheInfo());
+      p.settings.setParams(params);
+      p.setConfig(config);
+    }
+
+    try {
+      note.persist(p.getAuthenticationInfo());
+      boolean result = note.run(p.getId(), false);
+      callback.onSuccess(p, context);
+      return result;
+    } catch (Exception ex) {
+      LOGGER.error("Exception from run", ex);
+      p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex);
+      p.setStatus(Job.Status.ERROR);
+      callback.onFailure(new Exception("Fail to run paragraph " + paragraphId, ex), context);
+      return false;
+    }
+  }
+
+  public void runAllParagraphs(String noteId,
+                               List<Map<String, Object>> paragraphs,
+                               ServiceContext context,
+                               ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_ALL_PARAGRAPHS, context,
+        callback)) {
+      return;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    for (Map<String, Object> raw : paragraphs) {
+      String paragraphId = (String) raw.get("id");
+      if (paragraphId == null) {
+        continue;
+      }
+      String text = (String) raw.get("paragraph");
+      String title = (String) raw.get("title");
+      Map<String, Object> params = (Map<String, Object>) raw.get("params");
+      Map<String, Object> config = (Map<String, Object>) raw.get("config");
+
+      if (runParagraph(noteId, paragraphId, title, text, params, config, true, context, callback)) {
+        // stop execution when one paragraph fails.
+        break;
+      }
+    }
+  }
+
+  public void cancelParagraph(String noteId,
+                              String paragraphId,
+                              ServiceContext context,
+                              ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.RUNNER, Message.OP.CANCEL_PARAGRAPH, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      throw new NoteNotFoundException(noteId);
+    }
+    Paragraph p = note.getParagraph(paragraphId);
+    if (p == null) {
+      throw new ParagraphNotFoundException(paragraphId);
+    }
+    p.abort();
+    callback.onSuccess(p, context);
+  }
+
+  public void moveParagraph(String noteId,
+                            String paragraphId,
+                            int newIndex,
+                            ServiceContext context,
+                            ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.MOVE_PARAGRAPH, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      throw new NoteNotFoundException(noteId);
+    }
+    if (note.getParagraph(paragraphId) == null) {
+      throw new ParagraphNotFoundException(paragraphId);
+    }
+    if (newIndex >= note.getParagraphCount()) {
+      callback.onFailure(new BadRequestException("newIndex " + newIndex + " is out of bounds"),
+          context);
+      return;
+    }
+    note.moveParagraph(paragraphId, newIndex);
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(note.getParagraph(newIndex), context);
+  }
+
+  public void removeParagraph(String noteId,
+                              String paragraphId,
+                              ServiceContext context,
+                              ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_REMOVE, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      throw new NoteNotFoundException(noteId);
+    }
+    if (note.getParagraph(paragraphId) == null) {
+      throw new ParagraphNotFoundException(paragraphId);
+    }
+    Paragraph p = note.removeParagraph(context.getAutheInfo().getUser(), paragraphId);
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(p, context);
+  }
+
+  public Paragraph insertParagraph(String noteId,
+                                   int index,
+                                   Map<String, Object> config,
+                                   ServiceContext context,
+                                   ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.INSERT_PARAGRAPH, context,
+        callback)) {
+      return null;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      throw new NoteNotFoundException(noteId);
+    }
+    Paragraph newPara = note.insertNewParagraph(index, context.getAutheInfo());
+    newPara.setConfig(config);
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(newPara, context);
+    return newPara;
+  }
+
+  public void restoreNote(String noteId,
+                          ServiceContext context,
+                          ServiceCallback<Note> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.RESTORE_NOTE, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+    //restore cron
+    Map<String, Object> config = note.getConfig();
+    if (config.get("cron") != null) {
+      notebook.refreshCron(note.getId());
+    }
+
+    if (note.isTrash()) {
+      String newName = note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", "");
+      renameNote(noteId, newName, context, callback);
+    } else {
+      callback.onFailure(new IOException(String.format("Trying to restore a note {} " +
+          "which is not in Trash", noteId)), context);
+    }
+  }
+
+  public void updateParagraph(String noteId,
+                              String paragraphId,
+                              String title,
+                              String text,
+                              Map<String, Object> params,
+                              Map<String, Object> config,
+                              ServiceContext context,
+                              ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.COMMIT_PARAGRAPH, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+    Paragraph p = note.getParagraph(paragraphId);
+    if (p == null) {
+      callback.onFailure(new ParagraphNotFoundException(paragraphId), context);
+      return;
+    }
+
+    p.settings.setParams(params);
+    p.setConfig(config);
+    p.setTitle(title);
+    p.setText(text);
+    if (note.isPersonalizedMode()) {
+      p = p.getUserParagraph(context.getAutheInfo().getUser());
+      p.settings.setParams(params);
+      p.setConfig(config);
+      p.setTitle(title);
+      p.setText(text);
+    }
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(p, context);
+  }
+
+  public void clearParagraphOutput(String noteId,
+                                   String paragraphId,
+                                   ServiceContext context,
+                                   ServiceCallback<Paragraph> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_OUTPUT, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+    Paragraph p = note.getParagraph(paragraphId);
+    if (p == null) {
+      callback.onFailure(new ParagraphNotFoundException(paragraphId), context);
+      return;
+    }
+    Paragraph returnedParagraph = null;
+    if (note.isPersonalizedMode()) {
+      returnedParagraph = note.clearPersonalizedParagraphOutput(paragraphId,
+          context.getAutheInfo().getUser());
+    } else {
+      note.clearParagraphOutput(paragraphId);
+      returnedParagraph = note.getParagraph(paragraphId);
+    }
+    callback.onSuccess(returnedParagraph, context);
+  }
+
+  public void clearAllParagraphOutput(String noteId,
+                                      ServiceContext context,
+                                      ServiceCallback<Note> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_ALL_OUTPUT, context,
+        callback)) {
+      return;
+    }
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    note.clearAllParagraphOutput();
+    callback.onSuccess(note, context);
+  }
+
+
+
+  public void updateNote(String noteId,
+                         String name,
+                         Map<String, Object> config,
+                         ServiceContext context,
+                         ServiceCallback<Note> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.NOTE_UPDATE, context,
+        callback)) {
+      return;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) {
+      if (config.get("cron") != null) {
+        config.remove("cron");
+      }
+    }
+    boolean cronUpdated = isCronUpdated(config, note.getConfig());
+    note.setName(name);
+    note.setConfig(config);
+    if (cronUpdated) {
+      notebook.refreshCron(note.getId());
+    }
+
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(note, context);
+  }
+
+
+  private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
+    boolean cronUpdated = false;
+    if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron")
+        .equals(configB.get("cron"))) {
+      cronUpdated = true;
+    } else if (configA.get("cron") == null && configB.get("cron") == null) {
+      cronUpdated = false;
+    } else if (configA.get("cron") != null || configB.get("cron") != null) {
+      cronUpdated = true;
+    }
+
+    return cronUpdated;
+  }
+
+  public void saveNoteForms(String noteId,
+                            Map<String, Object> noteParams,
+                            ServiceContext context,
+                            ServiceCallback<Note> callback) throws IOException {
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.SAVE_NOTE_FORMS, context,
+        callback)) {
+      return;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    note.setNoteParams(noteParams);
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(note, context);
+  }
+
+  public void removeNoteForms(String noteId,
+                              String formName,
+                              ServiceContext context,
+                              ServiceCallback<Note> callback) throws IOException {
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.REMOVE_NOTE_FORMS, context,
+        callback)) {
+      return;
+    }
+
+    note.getNoteForms().remove(formName);
+    note.getNoteParams().remove(formName);
+    note.persist(context.getAutheInfo());
+    callback.onSuccess(note, context);
+  }
+
+  public NotebookRepoWithVersionControl.Revision checkpointNote(
+      String noteId,
+      String commitMessage,
+      ServiceContext context,
+      ServiceCallback<NotebookRepoWithVersionControl.Revision> callback) throws IOException {
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return null;
+    }
+
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.REMOVE_NOTE_FORMS, context,
+        callback)) {
+      return null;
+    }
+
+    NotebookRepoWithVersionControl.Revision revision =
+        notebook.checkpointNote(noteId, commitMessage, context.getAutheInfo());
+    callback.onSuccess(revision, context);
+    return revision;
+  }
+
+  public List<NotebookRepoWithVersionControl.Revision> listRevisionHistory(
+      String noteId,
+      ServiceContext context,
+      ServiceCallback<List<NotebookRepoWithVersionControl.Revision>> callback) throws IOException {
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return null;
+    }
+
+    // TODO(zjffdu) Disable checking permission for now, otherwise zeppelin will send 2 AUTH_INFO
+    // message to frontend when frontend try to get note without proper privilege.
+    //    if (!checkPermission(noteId, Permission.READER, Message.OP.LIST_REVISION_HISTORY, context,
+    //        callback)) {
+    //      return null;
+    //    }
+    List<NotebookRepoWithVersionControl.Revision> revisions =
+        notebook.listRevisionHistory(noteId, context.getAutheInfo());
+    callback.onSuccess(revisions, context);
+    return revisions;
+  }
+
+
+  public Note setNoteRevision(String noteId,
+                              String revisionId,
+                              ServiceContext context,
+                              ServiceCallback<Note> callback) throws IOException {
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return null;
+    }
+
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.SET_NOTE_REVISION, context,
+        callback)) {
+      return null;
+    }
+
+    try {
+      Note resultNote = notebook.setNoteRevision(noteId, revisionId, context.getAutheInfo());
+      callback.onSuccess(resultNote, context);
+      return resultNote;
+    } catch (Exception e) {
+      callback.onFailure(new IOException("Fail to set given note revision", e), context);
+      return null;
+    }
+  }
+
+  public void getNotebyRevision(String noteId,
+                                String revisionId,
+                                ServiceContext context,
+                                ServiceCallback<Note> callback) throws IOException {
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    if (!checkPermission(noteId, Permission.READER, Message.OP.NOTE_REVISION, context,
+        callback)) {
+      return;
+    }
+    Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, context.getAutheInfo());
+    callback.onSuccess(revisionNote, context);
+  }
+
+  public void getNoteByRevisionForCompare(String noteId,
+                                          String revisionId,
+                                          ServiceContext context,
+                                          ServiceCallback<Note> callback) throws IOException {
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return;
+    }
+
+    if (!checkPermission(noteId, Permission.READER, Message.OP.NOTE_REVISION_FOR_COMPARE, context,
+        callback)) {
+      return;
+    }
+    Note revisionNote = null;
+    if (revisionId.equals("Head")) {
+      revisionNote = notebook.getNote(noteId);
+    } else {
+      revisionNote = notebook.getNoteByRevision(noteId, revisionId, context.getAutheInfo());
+    }
+    callback.onSuccess(revisionNote, context);
+  }
+
+  public List<InterpreterCompletion> completion(
+      String noteId,
+      String paragraphId,
+      String buffer,
+      int cursor,
+      ServiceContext context,
+      ServiceCallback<List<InterpreterCompletion>> callback) throws IOException {
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      callback.onFailure(new NoteNotFoundException(noteId), context);
+      return null;
+    }
+
+    if (!checkPermission(noteId, Permission.WRITER, Message.OP.COMPLETION, context,
+        callback)) {
+      return null;
+    }
+
+    try {
+      List<InterpreterCompletion> completions = note.completion(paragraphId, buffer, cursor);
+      callback.onSuccess(completions, context);
+      return completions;
+    } catch (RuntimeException e) {
+      callback.onFailure(new IOException("Fail to get completion", e), context);
+      return null;
+    }
+  }
+
+
+
+
+  enum Permission {
+    READER,
+    WRITER,
+    RUNNER,
+    OWNER,
+  }
+
+  /**
+   * Return null when it is allowed, otherwise return the error message which could be
+   * propagated to frontend
+   *
+   * @param noteId
+   * @param context
+   * @param permission
+   * @param op
+   * @return
+   */
+  private <T> boolean checkPermission(String noteId,
+                                      Permission permission,
+                                      Message.OP op,
+                                      ServiceContext context,
+                                      ServiceCallback<T> callback) throws IOException {
+    boolean isAllowed = false;
+    Set<String> allowed = null;
+    switch (permission) {
+      case READER:
+        isAllowed = notebookAuthorization.isReader(noteId, context.getUserAndRoles());
+        allowed = notebookAuthorization.getReaders(noteId);
+        break;
+      case WRITER:
+        isAllowed = notebookAuthorization.isWriter(noteId, context.getUserAndRoles());
+        allowed = notebookAuthorization.getWriters(noteId);
+        break;
+      case RUNNER:
+        isAllowed = notebookAuthorization.isRunner(noteId, context.getUserAndRoles());
+        allowed = notebookAuthorization.getRunners(noteId);
+        break;
+      case OWNER:
+        isAllowed = notebookAuthorization.isOwner(noteId, context.getUserAndRoles());
+        allowed = notebookAuthorization.getOwners(noteId);
+        break;
+    }
+    if (isAllowed) {
+      return true;
+    } else {
+      String errorMsg = "Insufficient privileges to " + permission + " note.\n" +
+          "Allowed users or roles: " + allowed + "\n" + "But the user " +
+          context.getAutheInfo().getUser() + " belongs to: " + context.getUserAndRoles();
+      callback.onFailure(new ForbiddenException(errorMsg), context);
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java
new file mode 100644
index 0000000..fd5af9e
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.service;
+
+import java.io.IOException;
+
+/**
+ * This will be used by service classes as callback mechanism.
+ */
+public interface ServiceCallback<T> {
+
+  /**
+   * Called when this service call is starting
+   * @param message
+   * @param context
+   * @throws IOException
+   */
+  void onStart(String message, ServiceContext context) throws IOException;
+
+  /**
+   * Called when this service call is succeed
+   * @param result
+   * @param context
+   * @throws IOException
+   */
+  void onSuccess(T result, ServiceContext context) throws IOException;
+
+  /**
+   * Called when this service call is failed 
+   * @param ex
+   * @param context
+   * @throws IOException
+   */
+  void onFailure(Exception ex, ServiceContext context) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java
new file mode 100644
index 0000000..3db8bf8
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.service;
+
+import org.apache.zeppelin.user.AuthenticationInfo;
+
+import java.util.Set;
+
+/**
+ * Context info for Service call
+ */
+public class ServiceContext {
+
+  private AuthenticationInfo autheInfo;
+  private Set<String> userAndRoles;
+
+  public ServiceContext(AuthenticationInfo authInfo, Set<String> userAndRoles) {
+    this.autheInfo = authInfo;
+    this.userAndRoles = userAndRoles;
+  }
+
+  public AuthenticationInfo getAutheInfo() {
+    return autheInfo;
+  }
+
+  public Set<String> getUserAndRoles() {
+    return userAndRoles;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java
new file mode 100644
index 0000000..6957707
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *
+ * @param <T>
+ */
+public class SimpleServiceCallback<T> implements ServiceCallback<T> {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(SimpleServiceCallback.class);
+
+  @Override
+  public void onStart(String message, ServiceContext context) throws IOException {
+    LOGGER.debug(message);
+  }
+
+  @Override
+  public void onSuccess(T result, ServiceContext context) throws IOException {
+    LOGGER.debug("OP is succeeded");
+  }
+
+  @Override
+  public void onFailure(Exception ex, ServiceContext context) throws IOException {
+    LOGGER.warn(ex.getMessage());
+  }
+
+}


[2/3] zeppelin git commit: ZEPPELIN-3681. Introduce NotebookService

Posted by zj...@apache.org.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 78ff078..a376623 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -54,9 +54,13 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.service.NotebookService;
+import org.apache.zeppelin.service.ServiceContext;
+import org.apache.zeppelin.service.SimpleServiceCallback;
 import org.apache.zeppelin.ticket.TicketContainer;
 import org.apache.zeppelin.types.InterpreterSettingsList;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -75,6 +79,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.text.ParseException;
@@ -102,11 +107,11 @@ import java.util.regex.Pattern;
  */
 public class NotebookServer extends WebSocketServlet
     implements NotebookSocketListener,
-        JobListenerFactory,
-        AngularObjectRegistryListener,
-        RemoteInterpreterProcessListener,
-        ApplicationEventListener,
-        NotebookServerMBean {
+    JobListenerFactory,
+    AngularObjectRegistryListener,
+    RemoteInterpreterProcessListener,
+    ApplicationEventListener,
+    NotebookServerMBean {
 
   /**
    * Job manager service type.
@@ -127,8 +132,8 @@ public class NotebookServer extends WebSocketServlet
 
   private HashSet<String> collaborativeModeList = new HashSet<>();
   private Boolean collaborativeModeEnable = ZeppelinConfiguration
-          .create()
-          .isZeppelinNotebookCollaborativeModeEnable();
+      .create()
+      .isZeppelinNotebookCollaborativeModeEnable();
   private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
   private static Gson gson = new GsonBuilder()
       .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -140,6 +145,8 @@ public class NotebookServer extends WebSocketServlet
   final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
   final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
 
+  private NotebookService notebookService;
+
   private ExecutorService executorService = Executors.newFixedThreadPool(10);
 
   /**
@@ -154,6 +161,13 @@ public class NotebookServer extends WebSocketServlet
     return ZeppelinServer.notebook;
   }
 
+  private synchronized NotebookService getNotebookService() {
+    if (this.notebookService == null) {
+      this.notebookService = new NotebookService(notebook());
+    }
+    return this.notebookService;
+  }
+
   @Override
   public void configure(WebSocketServletFactory factory) {
     factory.setCreator(new NotebookWebSocketCreator(this));
@@ -174,8 +188,7 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onOpen(NotebookSocket conn) {
-    LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort());
+    LOG.info("New connection from {}", conn);
     connectedSockets.add(conn);
   }
 
@@ -184,12 +197,13 @@ public class NotebookServer extends WebSocketServlet
     Notebook notebook = notebook();
     try {
       Message messagereceived = deserializeMessage(msg);
-      LOG.debug("RECEIVE << " + messagereceived.op +
-          ", RECEIVE PRINCIPAL << " + messagereceived.principal +
-          ", RECEIVE TICKET << " + messagereceived.ticket +
-          ", RECEIVE ROLES << " + messagereceived.roles +
-          ", RECEIVE DATA << " + messagereceived.data);
-
+      if (messagereceived.op != OP.PING) {
+        LOG.debug("RECEIVE: " + messagereceived.op +
+            ", RECEIVE PRINCIPAL: " + messagereceived.principal +
+            ", RECEIVE TICKET: " + messagereceived.ticket +
+            ", RECEIVE ROLES: " + messagereceived.roles +
+            ", RECEIVE DATA: " + messagereceived.data);
+      }
       if (LOG.isTraceEnabled()) {
         LOG.trace("RECEIVE MSG = " + messagereceived);
       }
@@ -237,22 +251,22 @@ public class NotebookServer extends WebSocketServlet
       // Lets be elegant here
       switch (messagereceived.op) {
         case LIST_NOTES:
-          unicastNoteList(conn, subject, userAndRoles);
+          listNotes(conn, messagereceived);
           break;
         case RELOAD_NOTES_FROM_REPO:
           broadcastReloadedNoteList(subject, userAndRoles);
           break;
         case GET_HOME_NOTE:
-          sendHomeNote(conn, userAndRoles, notebook, messagereceived);
+          getHomeNote(conn, messagereceived);
           break;
         case GET_NOTE:
-          sendNote(conn, userAndRoles, notebook, messagereceived);
+          getNote(conn, messagereceived);
           break;
         case NEW_NOTE:
-          createNote(conn, userAndRoles, notebook, messagereceived);
+          createNote(conn, messagereceived);
           break;
         case DEL_NOTE:
-          removeNote(conn, userAndRoles, notebook, messagereceived);
+          deleteNote(conn, messagereceived);
           break;
         case REMOVE_FOLDER:
           removeFolder(conn, userAndRoles, notebook, messagereceived);
@@ -270,55 +284,55 @@ public class NotebookServer extends WebSocketServlet
           restoreFolder(conn, userAndRoles, notebook, messagereceived);
           break;
         case RESTORE_NOTE:
-          restoreNote(conn, userAndRoles, notebook, messagereceived);
+          restoreNote(conn, messagereceived);
           break;
         case RESTORE_ALL:
           restoreAll(conn, userAndRoles, notebook, messagereceived);
           break;
         case CLONE_NOTE:
-          cloneNote(conn, userAndRoles, notebook, messagereceived);
+          cloneNote(conn, messagereceived);
           break;
         case IMPORT_NOTE:
-          importNote(conn, userAndRoles, notebook, messagereceived);
+          importNote(conn, messagereceived);
           break;
         case COMMIT_PARAGRAPH:
-          updateParagraph(conn, userAndRoles, notebook, messagereceived);
+          updateParagraph(conn, messagereceived);
           break;
         case RUN_PARAGRAPH:
-          runParagraph(conn, userAndRoles, notebook, messagereceived);
+          runParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_EXECUTED_BY_SPELL:
           broadcastSpellExecution(conn, userAndRoles, notebook, messagereceived);
           break;
         case RUN_ALL_PARAGRAPHS:
-          runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
+          runAllParagraphs(conn, messagereceived);
           break;
         case CANCEL_PARAGRAPH:
-          cancelParagraph(conn, userAndRoles, notebook, messagereceived);
+          cancelParagraph(conn, messagereceived);
           break;
         case MOVE_PARAGRAPH:
-          moveParagraph(conn, userAndRoles, notebook, messagereceived);
+          moveParagraph(conn, messagereceived);
           break;
         case INSERT_PARAGRAPH:
-          insertParagraph(conn, userAndRoles, notebook, messagereceived);
+          insertParagraph(conn, messagereceived);
           break;
         case COPY_PARAGRAPH:
-          copyParagraph(conn, userAndRoles, notebook, messagereceived);
+          copyParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_REMOVE:
-          removeParagraph(conn, userAndRoles, notebook, messagereceived);
+          removeParagraph(conn, messagereceived);
           break;
         case PARAGRAPH_CLEAR_OUTPUT:
-          clearParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+          clearParagraphOutput(conn, messagereceived);
           break;
         case PARAGRAPH_CLEAR_ALL_OUTPUT:
-          clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+          clearAllParagraphOutput(conn, messagereceived);
           break;
         case NOTE_UPDATE:
-          updateNote(conn, userAndRoles, notebook, messagereceived);
+          updateNote(conn, messagereceived);
           break;
         case NOTE_RENAME:
-          renameNote(conn, userAndRoles, notebook, messagereceived);
+          renameNote(conn, messagereceived);
           break;
         case FOLDER_RENAME:
           renameFolder(conn, userAndRoles, notebook, messagereceived);
@@ -327,7 +341,7 @@ public class NotebookServer extends WebSocketServlet
           updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived);
           break;
         case COMPLETION:
-          completion(conn, userAndRoles, notebook, messagereceived);
+          completion(conn, messagereceived);
           break;
         case PING:
           break; //do nothing
@@ -344,19 +358,19 @@ public class NotebookServer extends WebSocketServlet
           sendAllConfigurations(conn, userAndRoles, notebook);
           break;
         case CHECKPOINT_NOTE:
-          checkpointNote(conn, notebook, messagereceived);
+          checkpointNote(conn, messagereceived);
           break;
         case LIST_REVISION_HISTORY:
-          listRevisionHistory(conn, notebook, messagereceived);
+          listRevisionHistory(conn, messagereceived);
           break;
         case SET_NOTE_REVISION:
-          setNoteRevision(conn, userAndRoles, notebook, messagereceived);
+          setNoteRevision(conn, messagereceived);
           break;
         case NOTE_REVISION:
-          getNoteByRevision(conn, notebook, messagereceived);
+          getNoteByRevision(conn, messagereceived);
           break;
         case NOTE_REVISION_FOR_COMPARE:
-          getNoteByRevisionForCompare(conn, notebook, messagereceived);
+          getNoteByRevisionForCompare(conn, messagereceived);
           break;
         case LIST_NOTE_JOBS:
           unicastNoteJobInfo(conn, messagereceived);
@@ -371,16 +385,16 @@ public class NotebookServer extends WebSocketServlet
           getEditorSetting(conn, messagereceived);
           break;
         case GET_INTERPRETER_SETTINGS:
-          getInterpreterSettings(conn, subject);
+          getInterpreterSettings(conn);
           break;
         case WATCHER:
           switchConnectionToWatcher(conn, messagereceived);
           break;
         case SAVE_NOTE_FORMS:
-          saveNoteForms(conn, userAndRoles, notebook, messagereceived);
+          saveNoteForms(conn, messagereceived);
           break;
         case REMOVE_NOTE_FORMS:
-          removeNoteForms(conn, userAndRoles, notebook, messagereceived);
+          removeNoteForms(conn, messagereceived);
           break;
         case PATCH_PARAGRAPH:
           patchParagraph(conn, userAndRoles, notebook, messagereceived);
@@ -395,14 +409,14 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onClose(NotebookSocket conn, int code, String reason) {
-    LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort(), code, reason);
+    LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
     removeConnectionFromAllNote(conn);
     connectedSockets.remove(conn);
     removeUserConnection(conn.getUser(), conn);
   }
 
   private void removeUserConnection(String user, NotebookSocket conn) {
+    LOG.debug("Remove user connection {} for user: {}", conn, user);
     if (userConnectedSockets.containsKey(user)) {
       userConnectedSockets.get(user).remove(conn);
     } else {
@@ -411,6 +425,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void addUserConnection(String user, NotebookSocket conn) {
+    LOG.debug("Add user connection {} for user: {}", conn, user);
     conn.setUser(user);
     if (userConnectedSockets.containsKey(user)) {
       userConnectedSockets.get(user).add(conn);
@@ -430,6 +445,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void addConnectionToNote(String noteId, NotebookSocket socket) {
+    LOG.debug("Add connection {} to note: {}", socket, noteId);
     synchronized (noteSocketMap) {
       removeConnectionFromAllNote(socket); // make sure a socket relates only a
       // single note.
@@ -446,6 +462,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
+    LOG.debug("Remove connection {} from note: {}", socket, noteId);
     synchronized (noteSocketMap) {
       List<NotebookSocket> socketList = noteSocketMap.get(noteId);
       if (socketList != null) {
@@ -455,7 +472,7 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void removeNote(String noteId) {
+  private void removeConn(String noteId) {
     synchronized (noteSocketMap) {
       List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
     }
@@ -485,7 +502,7 @@ public class NotebookServer extends WebSocketServlet
     message.put("status", collaborativeStatusNew);
     if (collaborativeStatusNew) {
       HashSet<String> userList = new HashSet<>();
-      for (NotebookSocket noteSocket: socketList) {
+      for (NotebookSocket noteSocket : socketList) {
         userList.add(noteSocket.getUser());
       }
       message.put("users", userList);
@@ -626,7 +643,8 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public List<Map<String, String>> generateNotesInfo(boolean needsReload,
-          AuthenticationInfo subject, Set<String> userAndRoles) {
+                                                     AuthenticationInfo subject,
+                                                     Set<String> userAndRoles) {
     Notebook notebook = notebook();
 
     ZeppelinConfiguration conf = notebook.getConf();
@@ -690,7 +708,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
-          Paragraph defaultParagraph) {
+                                  Paragraph defaultParagraph) {
     if (null != userParagraphMap) {
       for (String user : userParagraphMap.keySet()) {
         multicastToUser(user,
@@ -706,7 +724,7 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
   }
 
-  public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
+  public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
     if (subject == null) {
       subject = new AuthenticationInfo(StringUtils.EMPTY);
     }
@@ -717,10 +735,16 @@ public class NotebookServer extends WebSocketServlet
     broadcastNoteListExcept(notesInfo, subject);
   }
 
-  public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject,
-          HashSet<String> userAndRoles) {
-    List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
-    unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+  public void listNotes(NotebookSocket conn, Message message) throws IOException {
+    getNotebookService().listNotes(false, getServiceContext(message),
+        new WebSocketServiceCallback<List<Map<String, String>>>(conn) {
+          @Override
+          public void onSuccess(List<Map<String, String>> notesInfo,
+                                ServiceContext context) throws IOException {
+            super.onSuccess(notesInfo, context);
+            unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+          }
+        });
   }
 
   public void broadcastReloadedNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@@ -736,7 +760,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
-          AuthenticationInfo subject) {
+                                       AuthenticationInfo subject) {
     Set<String> userAndRoles;
     NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
     for (String user : userConnectedSockets.keySet()) {
@@ -752,7 +776,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles,
-          Set<String> allowed) throws IOException {
+                       Set<String> allowed) throws IOException {
     LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed);
 
     conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info",
@@ -761,32 +785,18 @@ public class NotebookServer extends WebSocketServlet
             .toString())));
   }
 
-  /**
-   * @return false if user doesn't have reader permission for this paragraph
-   */
-  private boolean hasParagraphReaderPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
-    NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
-    if (!notebookAuthorization.isReader(noteId, userAndRoles)) {
-      permissionError(conn, op, principal, userAndRoles,
-          notebookAuthorization.getOwners(noteId));
-      return false;
-    }
-
-    return true;
-  }
 
   /**
    * @return false if user doesn't have runner permission for this paragraph
    */
   private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
+                                               String noteId, HashSet<String> userAndRoles,
+                                               String principal, String op)
+      throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isRunner(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
-              notebookAuthorization.getOwners(noteId));
+          notebookAuthorization.getOwners(noteId));
       return false;
     }
 
@@ -797,8 +807,9 @@ public class NotebookServer extends WebSocketServlet
    * @return false if user doesn't have writer permission for this paragraph
    */
   private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook,
-          String noteId, HashSet<String> userAndRoles, String principal, String op)
-          throws IOException {
+                                               String noteId, HashSet<String> userAndRoles,
+                                               String principal, String op)
+      throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isWriter(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
@@ -813,7 +824,8 @@ public class NotebookServer extends WebSocketServlet
    * @return false if user doesn't have owner permission for this paragraph
    */
   private boolean hasParagraphOwnerPermission(NotebookSocket conn, Notebook notebook, String noteId,
-          HashSet<String> userAndRoles, String principal, String op) throws IOException {
+                                              HashSet<String> userAndRoles, String principal,
+                                              String op) throws IOException {
     NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
     if (!notebookAuthorization.isOwner(noteId, userAndRoles)) {
       permissionError(conn, op, principal, userAndRoles,
@@ -824,66 +836,45 @@ public class NotebookServer extends WebSocketServlet
     return true;
   }
 
-  private void sendNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(),
-        conn.getRequest().getRemotePort(), fromMessage.principal, fromMessage.op,
-        fromMessage.get("id"));
-
+  private void getNote(NotebookSocket conn,
+                       Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
     }
-
-    String user = fromMessage.principal;
-
-    Note note = notebook.getNote(noteId);
-
-    if (note != null) {
-      if (!hasParagraphReaderPermission(conn, notebook, noteId,
-          userAndRoles, fromMessage.principal, "read")) {
-        return;
-      }
-
-      addConnectionToNote(note.getId(), conn);
-
-      if (note.isPersonalizedMode()) {
-        note = note.getUserNote(user);
-      }
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
-      sendAllAngularObjects(note, user, conn);
-    } else {
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
-    }
+    getNotebookService().getNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            addConnectionToNote(note.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+            sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+          }
+        });
   }
 
-  private void sendHomeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN);
-    String user = fromMessage.principal;
-
-    Note note = null;
-    if (noteId != null) {
-      note = notebook.getNote(noteId);
-    }
-
-    if (note != null) {
-      if (!hasParagraphReaderPermission(conn, notebook, noteId,
-          userAndRoles, fromMessage.principal, "read")) {
-        return;
-      }
+  private void getHomeNote(NotebookSocket conn,
+                           Message fromMessage) throws IOException {
 
-      addConnectionToNote(note.getId(), conn);
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
-      sendAllAngularObjects(note, user, conn);
-    } else {
-      removeConnectionFromAllNote(conn);
-      conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
-    }
+    getNotebookService().getHomeNote(getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            if (note != null) {
+              addConnectionToNote(note.getId(), conn);
+              conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+              sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+            } else {
+              removeConnectionFromAllNote(conn);
+              conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
+            }
+          }
+        });
   }
 
-  private void updateNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void updateNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String name = (String) fromMessage.get("name");
     Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
@@ -894,35 +885,20 @@ public class NotebookServer extends WebSocketServlet
       return;
     }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) {
-        if (config.get("cron") != null) {
-          config.remove("cron");
-        }
-      }
-      boolean cronUpdated = isCronUpdated(config, note.getConfig());
-      note.setName(name);
-      note.setConfig(config);
-      if (cronUpdated) {
-        notebook.refreshCron(note.getId());
-      }
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name).put("config", config)
-          .put("info", note.getInfo()));
-      broadcastNoteList(subject, userAndRoles);
-    }
+    getNotebookService().updateNote(noteId, name, config, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+                .put("config", config)
+                .put("info", note.getInfo()));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void updatePersonalizedMode(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+                                      Notebook notebook, Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String personalized = (String) fromMessage.get("personalized");
     boolean isPersonalized = personalized.equals("true") ? true : false;
@@ -945,44 +921,31 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    renameNote(conn, userAndRoles, notebook, fromMessage, "rename");
-  }
-
-  private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage, String op) throws IOException {
+  private void renameNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     String name = (String) fromMessage.get("name");
-
     if (noteId == null) {
       return;
     }
-
-    if (!hasParagraphOwnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "rename")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.setName(name);
-      note.setCronSupported(notebook.getConf());
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNote(note);
-      broadcastNoteList(subject, userAndRoles);
-    }
+    getNotebookService().renameNote(noteId, name, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            broadcastNote(note);
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+                            Message fromMessage) throws IOException {
     renameFolder(conn, userAndRoles, notebook, fromMessage, "rename");
   }
 
   private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage, String op) throws IOException {
+                            Message fromMessage, String op) throws IOException {
     String oldFolderId = (String) fromMessage.get("id");
     String newFolderId = (String) fromMessage.get("name");
 
@@ -1013,76 +976,50 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
-    boolean cronUpdated = false;
-    if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron")
-        .equals(configB.get("cron"))) {
-      cronUpdated = true;
-    } else if (configA.get("cron") == null && configB.get("cron") == null) {
-      cronUpdated = false;
-    } else if (configA.get("cron") != null || configB.get("cron") != null) {
-      cronUpdated = true;
-    }
-
-    return cronUpdated;
-  }
-
-  private void createNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message message) throws IOException {
-    AuthenticationInfo subject = new AuthenticationInfo(message.principal);
+  private void createNote(NotebookSocket conn,
+                          Message message) throws IOException {
 
-    try {
-      Note note = null;
-      String defaultInterpreterSettingId = (String) message.get("defaultInterpreterGroup");
-      if (defaultInterpreterSettingId != null) {
-        note = notebook.createNote(defaultInterpreterSettingId, subject);
-      } else {
-        note = notebook.createNote(subject);
-      }
-
-      note.addNewParagraph(subject); // it's an empty note. so add one paragraph
-      if (message != null) {
-        String noteName = (String) message.get("name");
-        if (StringUtils.isEmpty(noteName)) {
-          noteName = "Note " + note.getId();
-        }
-        note.setName(noteName);
-        note.setCronSupported(notebook.getConf());
-      }
+    String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup");
+    String noteName = (String) message.get("name");
+    getNotebookService().createNote(defaultInterpreterGroup, noteName, getServiceContext(message),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            addConnectionToNote(note.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
 
-      note.persist(subject);
-      addConnectionToNote(note.getId(), conn);
-      conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
-    } catch (IOException e) {
-      LOG.error("Exception from createNote", e);
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Oops! There is something wrong with the notebook file system. "
-              + "Please check the logs for more details.")));
-      return;
-    }
-    broadcastNoteList(subject, userAndRoles);
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            super.onFailure(ex, context);
+            conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+                "Oops! There is something wrong with the notebook file system. "
+                    + "Please check the logs for more details.")));
+          }
+        });
   }
 
-  private void removeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void deleteNote(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
     }
-
-    if (!hasParagraphOwnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "remove")) {
-      return;
-    }
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    notebook.removeNote(noteId, subject);
-    removeNote(noteId);
-    broadcastNoteList(subject, userAndRoles);
+    getNotebookService().removeNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<String>(conn) {
+          @Override
+          public void onSuccess(String message, ServiceContext context) throws IOException {
+            super.onSuccess(message, context);
+            removeConn(noteId);
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
   private void removeFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+                            Message fromMessage) throws IOException {
     String folderId = (String) fromMessage.get("id");
     if (folderId == null) {
       return;
@@ -1101,13 +1038,13 @@ public class NotebookServer extends WebSocketServlet
     AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
     for (Note note : notes) {
       notebook.removeNote(note.getId(), subject);
-      removeNote(note.getId());
+      removeConn(note.getId());
     }
     broadcastNoteList(subject, userAndRoles);
   }
 
   private void moveNoteToTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                               Message fromMessage) throws SchedulerException, IOException {
     String noteId = (String) fromMessage.get("id");
     if (noteId == null) {
       return;
@@ -1121,15 +1058,16 @@ public class NotebookServer extends WebSocketServlet
       notebook.removeCron(note.getId());
     }
 
-    if (note != null && !note.isTrash()){
+    if (note != null && !note.isTrash()) {
       fromMessage.put("name", Folder.TRASH_FOLDER_ID + "/" + note.getName());
-      renameNote(conn, userAndRoles, notebook, fromMessage, "move");
+      renameNote(conn, fromMessage);
       notebook.moveNoteToTrash(note.getId());
     }
   }
 
   private void moveFolderToTrash(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws SchedulerException, IOException {
+                                 Notebook notebook, Message fromMessage)
+      throws SchedulerException, IOException {
     String folderId = (String) fromMessage.get("id");
     if (folderId == null) {
       return;
@@ -1138,14 +1076,14 @@ public class NotebookServer extends WebSocketServlet
     Folder folder = notebook.getFolder(folderId);
     if (folder != null && !folder.isTrash()) {
       String trashFolderId = Folder.TRASH_FOLDER_ID + "/" + folderId;
-      if (notebook.hasFolder(trashFolderId)){
+      if (notebook.hasFolder(trashFolderId)) {
         DateTime currentDate = new DateTime();
         DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
         trashFolderId += Folder.TRASH_FOLDER_CONFLICT_INFIX + formatter.print(currentDate);
       }
 
       List<Note> noteList = folder.getNotesRecursively();
-      for (Note note: noteList) {
+      for (Note note : noteList) {
         Map<String, Object> config = note.getConfig();
         if (config.get("cron") != null) {
           notebook.removeCron(note.getId());
@@ -1157,30 +1095,20 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void restoreNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+  private void restoreNote(NotebookSocket conn,
+                           Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("id");
-
     if (noteId == null) {
       return;
     }
 
-    Note note = notebook.getNote(noteId);
-
-    //restore cron
-    Map<String, Object> config = note.getConfig();
-    if (config.get("cron") != null) {
-      notebook.refreshCron(note.getId());
-    }
+    getNotebookService().restoreNote(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn));
 
-    if (note != null && note.isTrash()) {
-      fromMessage.put("name", note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", ""));
-      renameNote(conn, userAndRoles, notebook, fromMessage, "restore");
-    }
   }
 
   private void restoreFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                             Message fromMessage) throws SchedulerException, IOException {
     String folderId = (String) fromMessage.get("id");
 
     if (folderId == null) {
@@ -1211,7 +1139,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void restoreAll(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                          Message fromMessage) throws IOException {
     Folder trashFolder = notebook.getFolder(Folder.TRASH_FOLDER_ID);
     if (trashFolder != null) {
       fromMessage.data = new HashMap<>();
@@ -1222,57 +1150,42 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void emptyTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws SchedulerException, IOException {
+                          Message fromMessage) throws SchedulerException, IOException {
     fromMessage.data = new HashMap<>();
     fromMessage.put("id", Folder.TRASH_FOLDER_ID);
     removeFolder(conn, userAndRoles, notebook, fromMessage);
   }
 
-  private void updateParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void updateParagraph(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
-
-    Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
-    Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
     String noteId = getOpenNoteId(conn);
     if (noteId == null) {
       noteId = (String) fromMessage.get("noteId");
     }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    Paragraph p = note.getParagraph(paragraphId);
-
-    p.settings.setParams(params);
-    p.setConfig(config);
-    p.setTitle((String) fromMessage.get("title"));
-    p.setText((String) fromMessage.get("paragraph"));
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    if (note.isPersonalizedMode()) {
-      p = p.getUserParagraph(subject.getUser());
-      p.settings.setParams(params);
-      p.setConfig(config);
-      p.setTitle((String) fromMessage.get("title"));
-      p.setText((String) fromMessage.get("paragraph"));
-    }
-
-    note.persist(subject);
+    String title = (String) fromMessage.get("title");
+    String text = (String) fromMessage.get("paragraph");
+    Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+    Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
 
-    if (note.isPersonalizedMode()) {
-      Map<String, Paragraph> userParagraphMap =
-          note.getParagraph(paragraphId).getUserParagraphMap();
-      broadcastParagraphs(userParagraphMap, p);
-    } else {
-      broadcastParagraph(note, p);
-    }
+    getNotebookService().updateParagraph(noteId, paragraphId, title, text, params, config,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            if (p.getNote().isPersonalizedMode()) {
+              Map<String, Paragraph> userParagraphMap =
+                  p.getNote().getParagraph(paragraphId).getUserParagraphMap();
+              broadcastParagraphs(userParagraphMap, p);
+            } else {
+              broadcastParagraph(p.getNote(), p);
+            }
+          }
+        });
   }
 
   private void patchParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
@@ -1327,142 +1240,140 @@ public class NotebookServer extends WebSocketServlet
     paragraphText = (String) dmp.patchApply(patches, paragraphText)[0];
     p.setText(paragraphText);
     Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText)
-                                                     .put("paragraphId", p.getId());
+        .put("paragraphId", p.getId());
     broadcastExcept(note.getId(), message, conn);
   }
 
-  private void cloneNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void cloneNote(NotebookSocket conn,
+                         Message fromMessage) throws IOException {
+
     String noteId = getOpenNoteId(conn);
     String name = (String) fromMessage.get("name");
-    Note newNote = notebook.cloneNote(noteId, name, new AuthenticationInfo(fromMessage.principal));
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    addConnectionToNote(newNote.getId(), (NotebookSocket) conn);
-    conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
-    broadcastNoteList(subject, userAndRoles);
+    getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+            super.onSuccess(newNote, context);
+            addConnectionToNote(newNote.getId(), conn);
+            conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
+            broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+          }
+        });
   }
 
-  private void clearAllParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void clearAllParagraphOutput(NotebookSocket conn,
+                                       Message fromMessage) throws IOException {
     final String noteId = (String) fromMessage.get("id");
     if (StringUtils.isBlank(noteId)) {
       return;
     }
+    getNotebookService().clearAllParagraphOutput(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            broadcastNote(note);
+          }
+        });
+  }
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "clear output")) {
-      return;
-    }
+  protected Note importNote(NotebookSocket conn, Message fromMessage) throws IOException {
+    String noteName = (String) ((Map) fromMessage.get("note")).get("name");
+    String noteJson = gson.toJson(fromMessage.get("note"));
+    Note note = getNotebookService().importNote(noteName, noteJson, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            try {
+              broadcastNote(note);
+              broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+            } catch (NullPointerException e) {
+              // TODO(zjffdu) remove this try catch. This is only for test of
+              // NotebookServerTest#testImportNotebook
+            }
+          }
+        });
 
-    Note note = notebook.getNote(noteId);
-    note.clearAllParagraphOutput();
-    broadcastNote(note);
-  }
-
-  protected Note importNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    Note note = null;
-    if (fromMessage != null) {
-      String noteName = (String) ((Map) fromMessage.get("note")).get("name");
-      String noteJson = gson.toJson(fromMessage.get("note"));
-      AuthenticationInfo subject;
-      if (fromMessage.principal != null) {
-        subject = new AuthenticationInfo(fromMessage.principal);
-      } else {
-        subject = new AuthenticationInfo("anonymous");
-      }
-      note = notebook.importNote(noteJson, noteName, subject);
-      note.persist(subject);
-      broadcastNote(note);
-      broadcastNoteList(subject, userAndRoles);
-    }
     return note;
   }
 
-  private void removeParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void removeParagraph(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-
-    // Don't allow removing paragraph when there is only one paragraph in the Notebook
-    if (note.getParagraphCount() > 1) {
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      Paragraph para = note.removeParagraph(subject.getUser(), paragraphId);
-      note.persist(subject);
-      if (para != null) {
-        broadcast(note.getId(), new Message(OP.PARAGRAPH_REMOVED).
-            put("id", para.getId()));
-      }
-    }
+    getNotebookService().removeParagraph(noteId, paragraphId,
+        getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+                put("id", p.getId()));
+          }
+        });
   }
 
-  private void clearParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void clearParagraphOutput(NotebookSocket conn,
+                                    Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
-
     String noteId = getOpenNoteId(conn);
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    if (note.isPersonalizedMode()) {
-      String user = fromMessage.principal;
-      Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
-      unicastParagraph(note, p, user);
-    } else {
-      note.clearParagraphOutput(paragraphId);
-      Paragraph paragraph = note.getParagraph(paragraphId);
-      broadcastParagraph(note, paragraph);
-    }
+    getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            if (p.getNote().isPersonalizedMode()) {
+              unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+            } else {
+              broadcastParagraph(p.getNote(), p);
+            }
+          }
+        });
   }
 
-  private void completion(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void completion(NotebookSocket conn,
+                          Message fromMessage) throws IOException {
+    String noteId = getOpenNoteId(conn);
     String paragraphId = (String) fromMessage.get("id");
     String buffer = (String) fromMessage.get("buf");
     int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
-    Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
-    if (paragraphId == null) {
-      conn.send(serializeMessage(resp));
-      return;
-    }
+    getNotebookService().completion(noteId, paragraphId, buffer, cursor,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) {
+          @Override
+          public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context)
+              throws IOException {
+            super.onSuccess(completions, context);
+            Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+            resp.put("completions", completions);
+            conn.send(serializeMessage(resp));
+          }
 
-    final Note note = notebook.getNote(getOpenNoteId(conn));
-    List<InterpreterCompletion> candidates;
-    try {
-      candidates = note.completion(paragraphId, buffer, cursor);
-    } catch (RuntimeException e) {
-      LOG.info("Fail to get completion", e);
-      candidates = new ArrayList<>();
-    }
-    resp.put("completions", candidates);
-    conn.send(serializeMessage(resp));
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            super.onFailure(ex, context);
+            Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+            resp.put("completions", new ArrayList<>());
+            conn.send(serializeMessage(resp));
+          }
+        });
   }
 
   /**
    * When angular object updated from client.
    *
-   * @param conn the web socket.
-   * @param notebook the notebook.
+   * @param conn        the web socket.
+   * @param notebook    the notebook.
    * @param fromMessage the message.
    */
   private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) {
+                                    Notebook notebook, Message fromMessage) {
     String noteId = (String) fromMessage.get("noteId");
     String paragraphId = (String) fromMessage.get("paragraphId");
     String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
@@ -1547,7 +1458,7 @@ public class NotebookServer extends WebSocketServlet
    * and a paragraph id.
    */
   protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws Exception {
+                                         Notebook notebook, Message fromMessage) throws Exception {
     String noteId = fromMessage.getType("noteId");
     String varName = fromMessage.getType("name");
     Object varValue = fromMessage.get("value");
@@ -1581,7 +1492,8 @@ public class NotebookServer extends WebSocketServlet
    * and an optional list of paragraph id(s).
    */
   protected void angularObjectClientUnbind(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws Exception {
+                                           Notebook notebook, Message fromMessage)
+      throws Exception {
     String noteId = fromMessage.getType("noteId");
     String varName = fromMessage.getType("name");
     String paragraphId = fromMessage.getType("paragraphId");
@@ -1609,7 +1521,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId)
-          throws Exception {
+      throws Exception {
     final Paragraph paragraph = note.getParagraph(paragraphId);
     if (paragraph == null) {
       throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId);
@@ -1618,8 +1530,10 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName,
-          Object varValue, RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                                 Object varValue,
+                                                 RemoteAngularObjectRegistry remoteRegistry,
+                                                 String interpreterGroupId,
+                                                 NotebookSocket conn) {
     final AngularObject ao =
         remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
 
@@ -1629,8 +1543,9 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName,
-          RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                               RemoteAngularObjectRegistry remoteRegistry,
+                                               String interpreterGroupId,
+                                               NotebookSocket conn) {
     final AngularObject ao =
         remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
     this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao)
@@ -1639,8 +1554,9 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName,
-          Object varValue, AngularObjectRegistry registry, String interpreterGroupId,
-          NotebookSocket conn) {
+                                            Object varValue, AngularObjectRegistry registry,
+                                            String interpreterGroupId,
+                                            NotebookSocket conn) {
     AngularObject angularObject = registry.get(varName, noteId, paragraphId);
     if (angularObject == null) {
       angularObject = registry.add(varName, varValue, noteId, paragraphId);
@@ -1655,7 +1571,8 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName,
-          AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) {
+                                                AngularObjectRegistry registry,
+                                                String interpreterGroupId, NotebookSocket conn) {
     final AngularObject removed = registry.remove(varName, noteId, paragraphId);
     if (removed != null) {
       this.broadcastExcept(noteId,
@@ -1665,8 +1582,8 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void moveParagraph(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
@@ -1674,31 +1591,22 @@ public class NotebookServer extends WebSocketServlet
 
     final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
     String noteId = getOpenNoteId(conn);
-    final Note note = notebook.getNote(noteId);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    note.moveParagraph(paragraphId, newIndex);
-    note.persist(subject);
-    broadcast(note.getId(),
-        new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+    getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+            super.onSuccess(result, context);
+            broadcast(result.getNote().getId(),
+                new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+          }
+        });
   }
 
-  private String insertParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private String insertParagraph(NotebookSocket conn,
+                                 Message fromMessage) throws IOException {
     final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
     String noteId = getOpenNoteId(conn);
-    final Note note = notebook.getNote(noteId);
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return null;
-    }
     Map<String, Object> config;
     if (fromMessage.get("config") != null) {
       config = (Map<String, Object>) fromMessage.get("config");
@@ -1706,85 +1614,59 @@ public class NotebookServer extends WebSocketServlet
       config = new HashMap<>();
     }
 
-    Paragraph newPara = note.insertNewParagraph(index, subject);
-    newPara.setConfig(config);
-    note.persist(subject);
-    broadcastNewParagraph(note, newPara);
+    Paragraph newPara = getNotebookService().insertParagraph(noteId, index, config,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            super.onSuccess(p, context);
+            broadcastNewParagraph(p.getNote(), p);
+          }
+        });
 
     return newPara.getId();
   }
 
-  private void copyParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
-    String newParaId = insertParagraph(conn, userAndRoles, notebook, fromMessage);
+  private void copyParagraph(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
+    String newParaId = insertParagraph(conn, fromMessage);
 
     if (newParaId == null) {
       return;
     }
     fromMessage.put("id", newParaId);
 
-    updateParagraph(conn, userAndRoles, notebook, fromMessage);
+    updateParagraph(conn, fromMessage);
   }
 
-  private void cancelParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
     }
 
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    final Note note = notebook.getNote(noteId);
-    Paragraph p = note.getParagraph(paragraphId);
-    p.abort();
+    getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<>(conn));
   }
 
-  private void runAllParagraphs(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+  private void runAllParagraphs(NotebookSocket conn,
+                                Message fromMessage) throws IOException {
     final String noteId = (String) fromMessage.get("noteId");
     if (StringUtils.isBlank(noteId)) {
       return;
     }
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "run all paragraphs")) {
-      return;
-    }
-
     List<Map<String, Object>> paragraphs =
         gson.fromJson(String.valueOf(fromMessage.data.get("paragraphs")),
-            new TypeToken<List<Map<String, Object>>>() {}.getType());
-
-    for (Map<String, Object> raw : paragraphs) {
-      String paragraphId = (String) raw.get("id");
-      if (paragraphId == null) {
-        continue;
-      }
-
-      String text = (String) raw.get("paragraph");
-      String title = (String) raw.get("title");
-      Map<String, Object> params = (Map<String, Object>) raw.get("params");
-      Map<String, Object> config = (Map<String, Object>) raw.get("config");
-
-      Note note = notebook.getNote(noteId);
-      Paragraph p = setParagraphUsingMessage(note, fromMessage,
-          paragraphId, text, title, params, config);
+            new TypeToken<List<Map<String, Object>>>() {
+            }.getType());
 
-      if (p.isEnabled() && !persistAndExecuteSingleParagraph(conn, note, p, true)) {
-        // stop execution when one paragraph fails.
-        break;
-      }
-    }
+    getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn));
   }
 
   private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook, Message fromMessage) throws IOException {
+                                       Notebook notebook, Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
       return;
@@ -1837,39 +1719,40 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
   }
 
-  private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void runParagraph(NotebookSocket conn,
+                            Message fromMessage) throws IOException {
     final String paragraphId = (String) fromMessage.get("id");
     if (paragraphId == null) {
+      //TODO(zjffdu) it is possible ?
       return;
     }
 
     String noteId = getOpenNoteId(conn);
-
-    if (!hasParagraphRunnerPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "write")) {
-      return;
-    }
-
-    // 1. clear paragraph only if personalized,
-    // otherwise this will be handed in `onOutputClear`
-    final Note note = notebook.getNote(noteId);
-    if (note.isPersonalizedMode()) {
-      String user = fromMessage.principal;
-      Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
-      unicastParagraph(note, p, user);
-    }
-
-    // 2. set paragraph values
     String text = (String) fromMessage.get("paragraph");
     String title = (String) fromMessage.get("title");
     Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
     Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+    getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, false,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Paragraph>(conn) {
+          @Override
+          public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+            if (p.getNote().isPersonalizedMode()) {
+              Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
+                  context.getAutheInfo().getUser());
+              unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+            }
 
-    Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
-        text, title, params, config);
-
-    persistAndExecuteSingleParagraph(conn, note, p, false);
+            // if it's the last paragraph and not empty, let's add a new one
+            boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId);
+            if (!(Strings.isNullOrEmpty(p.getText()) ||
+                Strings.isNullOrEmpty(p.getScriptText())) &&
+                isTheLastParagraph) {
+              Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo());
+              broadcastNewParagraph(p.getNote(), newPara);
+            }
+          }
+        });
   }
 
   private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
@@ -1887,7 +1770,7 @@ public class NotebookServer extends WebSocketServlet
    * @return false if failed to save a note
    */
   private boolean persistNoteWithAuthInfo(NotebookSocket conn, Note note, Paragraph p)
-          throws IOException {
+      throws IOException {
     try {
       note.persist(p.getAuthenticationInfo());
       return true;
@@ -1901,28 +1784,9 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private boolean persistAndExecuteSingleParagraph(NotebookSocket conn, Note note, Paragraph p,
-          boolean blocking) throws IOException {
-    addNewParagraphIfLastParagraphIsExecuted(note, p);
-    if (!persistNoteWithAuthInfo(conn, note, p)) {
-      return false;
-    }
-
-    try {
-      return note.run(p.getId(), blocking);
-    } catch (Exception ex) {
-      LOG.error("Exception from run", ex);
-      if (p != null) {
-        p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex);
-        p.setStatus(Status.ERROR);
-        broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p));
-      }
-      return false;
-    }
-  }
-
   private Paragraph setParagraphUsingMessage(Note note, Message fromMessage, String paragraphId,
-          String text, String title, Map<String, Object> params, Map<String, Object> config) {
+                                             String text, String title, Map<String, Object> params,
+                                             Map<String, Object> config) {
     Paragraph p = note.getParagraph(paragraphId);
     p.setText(text);
     p.setTitle(title);
@@ -1945,7 +1809,7 @@ public class NotebookServer extends WebSocketServlet
   }
 
   private void sendAllConfigurations(NotebookSocket conn, HashSet<String> userAndRoles,
-          Notebook notebook) throws IOException {
+                                     Notebook notebook) throws IOException {
     ZeppelinConfiguration conf = notebook.getConf();
 
     Map<String, String> configurations =
@@ -1962,97 +1826,94 @@ public class NotebookServer extends WebSocketServlet
         new Message(OP.CONFIGURATIONS_INFO).put("configurations", configurations)));
   }
 
-  private void checkpointNote(NotebookSocket conn, Notebook notebook, Message fromMessage)
-          throws IOException {
+  private void checkpointNote(NotebookSocket conn, Message fromMessage)
+      throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String commitMessage = (String) fromMessage.get("commitMessage");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Revision revision = notebook.checkpointNote(noteId, commitMessage, subject);
-    if (!Revision.isEmpty(revision)) {
-      List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
-      conn.send(
-          serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
-    } else {
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
-              + "Please check the logs for more details.")));
-    }
+
+    getNotebookService().checkpointNote(noteId, commitMessage, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Revision>(conn) {
+          @Override
+          public void onSuccess(Revision revision, ServiceContext context) throws IOException {
+            super.onSuccess(revision, context);
+            if (!Revision.isEmpty(revision)) {
+              List<Revision> revisions =
+                  notebook().listRevisionHistory(noteId, context.getAutheInfo());
+              conn.send(
+                  serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList",
+                      revisions)));
+            } else {
+              conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+                  "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
+                      + "Please check the logs for more details.")));
+            }
+          }
+        });
   }
 
-  private void listRevisionHistory(NotebookSocket conn, Notebook notebook, Message fromMessage)
-          throws IOException {
+  private void listRevisionHistory(NotebookSocket conn, Message fromMessage)
+      throws IOException {
     String noteId = (String) fromMessage.get("noteId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
-
-    conn.send(
-        serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+    getNotebookService().listRevisionHistory(noteId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<List<Revision>>(conn) {
+          @Override
+          public void onSuccess(List<Revision> revisions, ServiceContext context)
+              throws IOException {
+            super.onSuccess(revisions, context);
+            conn.send(serializeMessage(
+                new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+          }
+        });
   }
 
-  private void setNoteRevision(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void setNoteRevision(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note headNote = null;
-    boolean setRevisionStatus;
-    try {
-      headNote = notebook.setNoteRevision(noteId, revisionId, subject);
-      setRevisionStatus = headNote != null;
-    } catch (Exception e) {
-      setRevisionStatus = false;
-      LOG.error("Failed to set given note revision", e);
-    }
-    if (setRevisionStatus) {
-      notebook.loadNoteFromRepo(noteId, subject);
-    }
-
-    conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", setRevisionStatus)));
-
-    if (setRevisionStatus) {
-      Note reloadedNote = notebook.getNote(headNote.getId());
-      broadcastNote(reloadedNote);
-    } else {
-      conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
-          "Couldn't set note to the given revision. "
-              + "Please check the logs for more details.")));
-    }
+    getNotebookService().setNoteRevision(noteId, revisionId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo());
+            conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true)));
+            broadcastNote(reloadedNote);
+          }
+        });
   }
 
-  private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message fromMessage)
+  private void getNoteByRevision(NotebookSocket conn, Message fromMessage)
       throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
-    conn.send(serializeMessage(
-        new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
-            .put("note", revisionNote)));
+    getNotebookService().getNotebyRevision(noteId, revisionId, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            conn.send(serializeMessage(
+                new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
+                    .put("note", note)));
+          }
+        });
   }
 
-  private void getNoteByRevisionForCompare(NotebookSocket conn, Notebook notebook,
-      Message fromMessage) throws IOException {
+  private void getNoteByRevisionForCompare(NotebookSocket conn,
+                                           Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String revisionId = (String) fromMessage.get("revisionId");
-
     String position = (String) fromMessage.get("position");
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    Note revisionNote;
-    if (revisionId.equals("Head")) {
-      revisionNote = notebook.getNote(noteId);
-    } else {
-      revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
-    }
-
-    conn.send(serializeMessage(
-        new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
-            .put("revisionId", revisionId).put("position", position).put("note", revisionNote)));
+    getNotebookService().getNoteByRevisionForCompare(noteId, revisionId,
+        getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            super.onSuccess(note, context);
+            conn.send(serializeMessage(
+                new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
+                    .put("revisionId", revisionId).put("position", position).put("note", note)));
+          }
+        });
   }
 
   /**
@@ -2074,7 +1935,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputUpdated(String noteId, String paragraphId, int index,
-      InterpreterResult.Type type, String output) {
+                              InterpreterResult.Type type, String output) {
     Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId)
         .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output);
     Note note = notebook().getNote(noteId);
@@ -2105,7 +1966,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputAppend(String noteId, String paragraphId, int index, String appId,
-      String output) {
+                             String output) {
     Message msg =
         new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("appId", appId).put("data", output);
@@ -2117,7 +1978,7 @@ public class NotebookServer extends WebSocketServlet
    */
   @Override
   public void onOutputUpdated(String noteId, String paragraphId, int index, String appId,
-      InterpreterResult.Type type, String output) {
+                              InterpreterResult.Type type, String output) {
     Message msg =
         new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
             .put("index", index).put("type", type).put("appId", appId).put("data", output);
@@ -2322,7 +2183,7 @@ public class NotebookServer extends WebSocketServlet
         if (job.getStatus() == Status.FINISHED) {
           LOG.info("Job {} is finished successfully, status: {}", job.getId(), job.getStatus());
         } else {
-          LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}" , job.getId(),
+          LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", job.getId(),
               job.getStatus(), job.getException(), job.getReturn());
         }
 
@@ -2484,7 +2345,7 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-  private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
+  private void getInterpreterSettings(NotebookSocket conn)
       throws IOException {
     List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get();
     conn.send(serializeMessage(
@@ -2525,7 +2386,7 @@ public class NotebookServer extends WebSocketServlet
 
   private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
     synchronized (connectedSockets) {
-      for (NotebookSocket conn: connectedSockets) {
+      for (NotebookSocket conn : connectedSockets) {
         if (exclude != null && exclude.equals(conn)) {
           continue;
         }
@@ -2555,7 +2416,7 @@ public class NotebookServer extends WebSocketServlet
 
   @Override
   public void onParaInfosReceived(String noteId, String paragraphId,
-          String interpreterSettingId, Map<String, String> metaInfos) {
+                                  String interpreterSettingId, Map<String, String> metaInfos) {
     Note note = notebook().getNote(noteId);
     if (note != null) {
       Paragraph paragraph = note.getParagraph(paragraphId);
@@ -2601,53 +2462,39 @@ public class NotebookServer extends WebSocketServlet
     setting.clearNoteIdAndParaMap();
   }
 
-  public void broadcastNoteForms(Note note) {
+  private void broadcastNoteForms(Note note) {
     GUI formsSettings = new GUI();
     formsSettings.setForms(note.getNoteForms());
     formsSettings.setParams(note.getNoteParams());
-
     broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
   }
 
-  private void saveNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void saveNoteForms(NotebookSocket conn,
+                             Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     Map<String, Object> noteParams = (Map<String, Object>) fromMessage.get("noteParams");
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.setNoteParams(noteParams);
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNoteForms(note);
-    }
+    getNotebookService().saveNoteForms(noteId, noteParams, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcastNoteForms(note);
+          }
+        });
   }
 
-  private void removeNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
-          Message fromMessage) throws IOException {
+  private void removeNoteForms(NotebookSocket conn,
+                               Message fromMessage) throws IOException {
     String noteId = (String) fromMessage.get("noteId");
     String formName = (String) fromMessage.get("formName");
 
-    if (!hasParagraphWriterPermission(conn, notebook, noteId,
-        userAndRoles, fromMessage.principal, "update")) {
-      return;
-    }
-
-    Note note = notebook.getNote(noteId);
-    if (note != null) {
-      note.getNoteForms().remove(formName);
-      note.getNoteParams().remove(formName);
-
-      AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-      note.persist(subject);
-      broadcastNoteForms(note);
-    }
+    getNotebookService().removeNoteForms(noteId, formName, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<Note>(conn) {
+          @Override
+          public void onSuccess(Note note, ServiceContext context) throws IOException {
+            broadcastNoteForms(note);
+          }
+        });
   }
 
   @Override
@@ -2665,4 +2512,43 @@ public class NotebookServer extends WebSocketServlet
     m.data.put("notice", message);
     broadcast(m);
   }
+
+  private ServiceContext getServiceContext(Message message) {
+    AuthenticationInfo authInfo =
+        new AuthenticationInfo(message.principal, message.roles, message.ticket);
+    Set<String> userAndRoles = new HashSet<>();
+    userAndRoles.add(message.principal);
+    if (message.roles != null && !message.roles.equals("")) {
+      HashSet<String> roles =
+          gson.fromJson(message.roles, new TypeToken<HashSet<String>>() {
+          }.getType());
+      if (roles != null) {
+        userAndRoles.addAll(roles);
+      }
+    }
+    return new ServiceContext(authInfo, userAndRoles);
+  }
+
+  private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
+
+    private NotebookSocket conn;
+
+    WebSocketServiceCallback(NotebookSocket conn) {
+      this.conn = conn;
+    }
+
+    @Override
+    public void onFailure(Exception ex, ServiceContext context) throws IOException {
+      super.onFailure(ex, context);
+      if (ex instanceof ForbiddenException) {
+        Type type = new TypeToken<Map<String, String>>(){}.getType();
+        Map<String, String> jsonObject =
+            gson.fromJson(((ForbiddenException) ex).getResponse().getEntity().toString(), type);
+        conn.send(serializeMessage(new Message(OP.AUTH_INFO)
+            .put("info", jsonObject.get("message"))));
+      } else {
+        conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", ex.getMessage())));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
index 66e4038..65740ff 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -77,4 +77,9 @@ public class NotebookSocket extends WebSocketAdapter {
   public void setUser(String user) {
     this.user = user;
   }
+
+  @Override
+  public String toString() {
+    return request.getRemoteHost() + ":" + request.getRemotePort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
deleted file mode 100644
index ddfe7c4..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-/** This will be used by some services to pass messages to frontend via WebSocket */
-public interface ServiceCallback {
-  void onStart(String message);
-
-  void onSuccess(String message);
-
-  void onFailure(String message);
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index ce867ee..62a3782 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -221,8 +221,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
   @Test
   public void testDeleteNoteBadId() throws IOException {
     LOG.info("testDeleteNoteBadId");
-    testDeleteNote("2AZFXEX97");
-    testDeleteNote("bad_ID");
+    testDeleteNotExistNote("bad_ID");
   }
 
   @Test
@@ -317,6 +316,13 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
     }
   }
 
+  private void testDeleteNotExistNote(String noteId) throws IOException {
+    DeleteMethod delete = httpDelete(("/notebook/" + noteId));
+    LOG.info("testDeleteNote delete response\n" + delete.getResponseBodyAsString());
+    assertThat("Test delete method:", delete, isNotFound());
+    delete.releaseConnection();
+  }
+
   @Test
   public void testCloneNote() throws IOException, IllegalArgumentException {
     LOG.info("testCloneNote");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
index ab74012..cc4b1df 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
@@ -33,7 +33,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
-import org.apache.zeppelin.socket.ServiceCallback;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -120,19 +119,19 @@ public class InterpreterServiceTest {
         new InterpreterInstallationRequest(interpreterName, artifactName),
         dependencyResolver,
         specificInterpreterPath,
-        new ServiceCallback() {
+        new SimpleServiceCallback<String>() {
           @Override
-          public void onStart(String message) {
+          public void onStart(String message, ServiceContext context) {
             assertEquals("Starting to download " + interpreterName + " interpreter", message);
           }
 
           @Override
-          public void onSuccess(String message) {
+          public void onSuccess(String message, ServiceContext context) {
             assertEquals(interpreterName + " downloaded", message);
           }
 
           @Override
-          public void onFailure(String message) {
+          public void onFailure(Exception ex, ServiceContext context) {
             fail();
           }
         });