You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/22 03:31:15 UTC

zeppelin git commit: ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService

Repository: zeppelin
Updated Branches:
  refs/heads/master bfa84f5b8 -> 001c621c7


ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService

### What is this PR for?

This is refactoring PR which move all JobManager page related stuff into class JobManagerService.

### What type of PR is it?
[Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3737

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #3154 from zjffdu/ZEPPELIN-3737 and squashes the following commits:

8e334e81b [Jeff Zhang] ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/001c621c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/001c621c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/001c621c

Branch: refs/heads/master
Commit: 001c621c773db837b4c90bbf7868c94f284247ce
Parents: bfa84f5
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Aug 20 17:32:26 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Aug 22 11:31:10 2018 +0800

----------------------------------------------------------------------
 .../apache/zeppelin/rest/NotebookRestApi.java   |  20 +-
 .../zeppelin/service/JobManagerService.java     | 161 +++++++++++++++
 .../apache/zeppelin/socket/NotebookServer.java  | 168 ++++++++-------
 .../org/apache/zeppelin/notebook/Notebook.java  | 204 -------------------
 4 files changed, 260 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index f831f3f..8411263 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -41,6 +41,7 @@ import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest;
 import org.apache.zeppelin.rest.message.UpdateParagraphRequest;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.server.JsonResponse;
+import org.apache.zeppelin.service.JobManagerService;
 import org.apache.zeppelin.service.NotebookService;
 import org.apache.zeppelin.service.ServiceContext;
 import org.apache.zeppelin.socket.NotebookServer;
@@ -83,12 +84,14 @@ public class NotebookRestApi extends AbstractRestApi {
   private SearchService noteSearchService;
   private NotebookAuthorization notebookAuthorization;
   private NotebookService notebookService;
+  private JobManagerService jobManagerService;
 
   @Inject
   public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) {
     this.notebook = notebook;
     this.notebookServer = notebookServer;
     this.notebookService = new NotebookService(notebook);
+    this.jobManagerService = new JobManagerService(notebook);
     this.noteSearchService = search;
     this.notebookAuthorization = notebook.getNotebookAuthorization();
     this.zConf = notebook.getConf();
@@ -919,15 +922,11 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getJobListforNote() throws IOException, IllegalArgumentException {
     LOG.info("Get note jobs for job manager");
-
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    List<Map<String, Object>> noteJobs = notebook
-        .getJobListByUnixTime(false, 0, subject);
+    List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
+        .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
     Map<String, Object> response = new HashMap<>();
-
     response.put("lastResponseUnixTime", System.currentTimeMillis());
     response.put("jobs", noteJobs);
-
     return new JsonResponse<>(Status.OK, response).build();
   }
 
@@ -946,15 +945,12 @@ public class NotebookRestApi extends AbstractRestApi {
   public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime)
       throws IOException, IllegalArgumentException {
     LOG.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
-
-    List<Map<String, Object>> noteJobs;
-    AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-    noteJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject);
+    List<JobManagerService.NoteJobInfo> noteJobs =
+        jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
+            new RestServiceCallback<>());
     Map<String, Object> response = new HashMap<>();
-
     response.put("lastResponseUnixTime", System.currentTimeMillis());
     response.put("jobs", noteJobs);
-
     return new JsonResponse<>(Status.OK, response).build();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
new file mode 100644
index 0000000..374d8ff
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.scheduler.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Service class for JobManager Page
+ */
+public class JobManagerService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerService.class);
+
+  private Notebook notebook;
+
+  public JobManagerService(Notebook notebook) {
+    this.notebook = notebook;
+  }
+
+  public List<NoteJobInfo> getNoteJobInfo(String noteId,
+                                          ServiceContext context,
+                                          ServiceCallback<List<NoteJobInfo>> callback)
+      throws IOException {
+    List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+    Note jobNote = notebook.getNote(noteId);
+    notesJobInfo.add(new NoteJobInfo(jobNote));
+    callback.onSuccess(notesJobInfo, context);
+    return notesJobInfo;
+  }
+
+  /**
+   * Get all NoteJobInfo after lastUpdateServerUnixTime
+   */
+  public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
+                                                    ServiceContext context,
+                                                    ServiceCallback<List<NoteJobInfo>> callback)
+      throws IOException {
+    List<Note> notes = notebook.getAllNotes();
+    List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+    for (Note note : notes) {
+      NoteJobInfo noteJobInfo = new NoteJobInfo(note);
+      if (noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime) {
+        notesJobInfo.add(noteJobInfo);
+      }
+    }
+    callback.onSuccess(notesJobInfo, context);
+    return notesJobInfo;
+  }
+
+  public void removeNoteJobInfo(String noteId,
+                                ServiceContext context,
+                                ServiceCallback<List<NoteJobInfo>> callback) throws IOException {
+    List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+    notesJobInfo.add(new NoteJobInfo(noteId, true));
+    callback.onSuccess(notesJobInfo, context);
+  }
+
+  private static long getUnixTimeLastRunParagraph(Paragraph paragraph) {
+    if (paragraph.isTerminated() && paragraph.getDateFinished() != null) {
+      return paragraph.getDateFinished().getTime();
+    } else if (paragraph.isRunning()) {
+      return new Date().getTime();
+    } else {
+      return paragraph.getDateCreated().getTime();
+    }
+  }
+
+
+  public static class ParagraphJobInfo {
+    private String id;
+    private String name;
+    private Job.Status status;
+
+    public ParagraphJobInfo(Paragraph p) {
+      this.id = p.getId();
+      if (StringUtils.isBlank(p.getTitle())) {
+        this.name = p.getId();
+      } else {
+        this.name = p.getTitle();
+      }
+      this.status = p.getStatus();
+    }
+  }
+
+  public static class NoteJobInfo {
+    private String noteId;
+    private String noteName;
+    private String noteType;
+    private String interpreter;
+    private boolean isRunningJob;
+    private boolean isRemoved = false;
+    private long unixTimeLastRun;
+    private List<ParagraphJobInfo> paragraphs;
+
+    public NoteJobInfo(Note note) {
+      boolean isNoteRunning = false;
+      long lastRunningUnixTime = 0;
+      this.noteId = note.getId();
+      this.noteName = note.getName();
+      // set note type ( cron or normal )
+      if (isCron(note)) {
+        this.noteType = "cron";
+      } else {
+        this.noteType = "normal";
+      }
+      this.interpreter = note.getDefaultInterpreterGroup();
+
+      // set paragraphs
+      this.paragraphs = new ArrayList<>();
+      for (Paragraph paragraph : note.getParagraphs()) {
+        // check paragraph's status.
+        if (paragraph.getStatus().isRunning()) {
+          isNoteRunning = true;
+        }
+        // get data for the job manager.
+        ParagraphJobInfo paragraphItem = new ParagraphJobInfo(paragraph);
+        lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
+        paragraphs.add(paragraphItem);
+      }
+
+      this.isRunningJob = isNoteRunning;
+      this.unixTimeLastRun = lastRunningUnixTime;
+    }
+
+    private boolean isCron(Note note) {
+      return note.getConfig().containsKey("cron") &&
+          !StringUtils.isBlank(note.getConfig().get("cron").toString());
+    }
+
+    public NoteJobInfo(String noteId, boolean isRemoved) {
+      this.noteId = noteId;
+      this.isRemoved = isRemoved;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 35da481..16719f3 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -57,6 +57,7 @@ import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.service.ConfigurationService;
+import org.apache.zeppelin.service.JobManagerService;
 import org.apache.zeppelin.service.NotebookService;
 import org.apache.zeppelin.service.ServiceContext;
 import org.apache.zeppelin.service.SimpleServiceCallback;
@@ -115,11 +116,11 @@ public class NotebookServer extends WebSocketServlet
   /**
    * Job manager service type.
    */
-  protected enum JobManagerService {
+  protected enum JobManagerServiceType {
     JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
     private String serviceTypeKey;
 
-    JobManagerService(String serviceType) {
+    JobManagerServiceType(String serviceType) {
       this.serviceTypeKey = serviceType;
     }
 
@@ -146,6 +147,7 @@ public class NotebookServer extends WebSocketServlet
 
   private NotebookService notebookService;
   private ConfigurationService configurationService;
+  private JobManagerService jobManagerService;
 
   private ExecutorService executorService = Executors.newFixedThreadPool(10);
 
@@ -175,6 +177,13 @@ public class NotebookServer extends WebSocketServlet
     return this.configurationService;
   }
 
+  public synchronized JobManagerService getJobManagerService() {
+    if (this.jobManagerService == null) {
+      this.jobManagerService = new JobManagerService(notebook());
+    }
+    return this.jobManagerService;
+  }
+
   @Override
   public void configure(WebSocketServletFactory factory) {
     factory.setCreator(new NotebookWebSocketCreator(this));
@@ -609,36 +618,49 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
-    addConnectionToNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn);
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    List<Map<String, Object>> noteJobs = notebook().getJobListByUnixTime(false, 0, subject);
-    Map<String, Object> response = new HashMap<>();
-
-    response.put("lastResponseUnixTime", System.currentTimeMillis());
-    response.put("jobs", noteJobs);
+    addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+    getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage),
+        new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) {
+          @Override
+          public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+                                ServiceContext context) throws IOException {
+            super.onSuccess(notesJobInfo, context);
+            Map<String, Object> response = new HashMap<>();
+            response.put("lastResponseUnixTime", System.currentTimeMillis());
+            response.put("jobs", notesJobInfo);
+            conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response)));
+          }
 
-    conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response)));
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            LOG.warn(ex.getMessage());
+          }
+        });
   }
 
   public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException {
-    List<Map<String, Object>> noteJobs = new LinkedList<>();
-    Notebook notebookObject = notebook();
-    List<Map<String, Object>> jobNotes;
-    if (notebookObject != null) {
-      jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null);
-      noteJobs = jobNotes == null ? noteJobs : jobNotes;
-    }
-
-    Map<String, Object> response = new HashMap<>();
-    response.put("lastResponseUnixTime", System.currentTimeMillis());
-    response.put("jobs", noteJobs != null ? noteJobs : new LinkedList<>());
+    getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null,
+        new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) {
+          @Override
+          public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+                                ServiceContext context) throws IOException {
+            super.onSuccess(notesJobInfo, context);
+            Map<String, Object> response = new HashMap<>();
+            response.put("lastResponseUnixTime", System.currentTimeMillis());
+            response.put("jobs", notesJobInfo);
+            broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+                new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+          }
 
-    broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
-        new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+          @Override
+          public void onFailure(Exception ex, ServiceContext context) throws IOException {
+            LOG.warn(ex.getMessage());
+          }
+        });
   }
 
   public void unsubscribeNoteJobInfo(NotebookSocket conn) {
-    removeConnectionFromNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn);
+    removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
   }
 
   public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -2033,7 +2055,7 @@ public class NotebookServer extends WebSocketServlet
   /**
    * Notebook Information Change event.
    */
-  public static class NotebookInformationListener implements NotebookEventListener {
+  public class NotebookInformationListener implements NotebookEventListener {
     private NotebookServer notebookServer;
 
     public NotebookInformationListener(NotebookServer notebookServer) {
@@ -2043,9 +2065,10 @@ public class NotebookServer extends WebSocketServlet
     @Override
     public void onParagraphRemove(Paragraph p) {
       try {
-        notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
-      } catch (IOException ioe) {
-        LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+        getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, null,
+            new JobManagerServiceCallback());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
       }
     }
 
@@ -2053,72 +2076,63 @@ public class NotebookServer extends WebSocketServlet
     public void onNoteRemove(Note note) {
       try {
         notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
-      } catch (IOException ioe) {
-        LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
       }
 
-      List<Map<String, Object>> notesInfo = new LinkedList<>();
-      Map<String, Object> info = new HashMap<>();
-      info.put("noteId", note.getId());
-      // set paragraphs
-      List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
-
-      // notebook json object root information.
-      info.put("isRunningJob", false);
-      info.put("unixTimeLastRun", 0);
-      info.put("isRemoved", true);
-      info.put("paragraphs", paragraphsInfo);
-      notesInfo.add(info);
-
-      Map<String, Object> response = new HashMap<>();
-      response.put("lastResponseUnixTime", System.currentTimeMillis());
-      response.put("jobs", notesInfo);
-
-      notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
-          new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+      try {
+        getJobManagerService().removeNoteJobInfo(note.getId(), null,
+            new JobManagerServiceCallback());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+      }
     }
 
     @Override
     public void onParagraphCreate(Paragraph p) {
-      Notebook notebook = notebookServer.notebook();
-      List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId());
-      Map<String, Object> response = new HashMap<>();
-      response.put("lastResponseUnixTime", System.currentTimeMillis());
-      response.put("jobs", notebookJobs);
-
-      notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
-          new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+      try {
+        notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null,
+            new JobManagerServiceCallback());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+      }
     }
 
     @Override
     public void onNoteCreate(Note note) {
-      Notebook notebook = notebookServer.notebook();
-      List<Map<String, Object>> notebookJobs = notebook.getJobListByNoteId(note.getId());
-      Map<String, Object> response = new HashMap<>();
-      response.put("lastResponseUnixTime", System.currentTimeMillis());
-      response.put("jobs", notebookJobs);
-
-      notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
-          new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+      try {
+        notebookServer.getJobManagerService().getNoteJobInfo(note.getId(), null,
+            new JobManagerServiceCallback());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+      }
     }
 
     @Override
     public void onParagraphStatusChange(Paragraph p, Status status) {
-      Notebook notebook = notebookServer.notebook();
-      List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId());
-
-      Map<String, Object> response = new HashMap<>();
-      response.put("lastResponseUnixTime", System.currentTimeMillis());
-      response.put("jobs", notebookJobs);
-
-      notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
-          new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+      try {
+        notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null,
+            new JobManagerServiceCallback());
+      } catch (IOException e) {
+        LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+      }
     }
 
+    private class JobManagerServiceCallback
+        extends SimpleServiceCallback<List<JobManagerService.NoteJobInfo>> {
+      @Override
+      public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+                            ServiceContext context) throws IOException {
+        super.onSuccess(notesJobInfo, context);
+        Map<String, Object> response = new HashMap<>();
+        response.put("lastResponseUnixTime", System.currentTimeMillis());
+        response.put("jobs", notesJobInfo);
+        broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+            new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+      }
+    }
   }
 
-
-
   @Override
   public void onProgressUpdate(Paragraph p, int progress) {
     broadcast(p.getNote().getId(),
@@ -2458,7 +2472,7 @@ public class NotebookServer extends WebSocketServlet
     return new ServiceContext(authInfo, userAndRoles);
   }
 
-  private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
+  public class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
 
     private NotebookSocket conn;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 7cf0f54..b7dcdc3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -613,210 +613,6 @@ public class Notebook implements NoteEventListener {
     }
   }
 
-  private Map<String, Object> getParagraphForJobManagerItem(Paragraph paragraph) {
-    Map<String, Object> paragraphItem = new HashMap<>();
-
-    // set paragraph id
-    paragraphItem.put("id", paragraph.getId());
-
-    // set paragraph name
-    String paragraphName = paragraph.getTitle();
-    if (paragraphName != null) {
-      paragraphItem.put("name", paragraphName);
-    } else {
-      paragraphItem.put("name", paragraph.getId());
-    }
-
-    // set status for paragraph.
-    paragraphItem.put("status", paragraph.getStatus().toString());
-
-    return paragraphItem;
-  }
-
-  private long getUnixTimeLastRunParagraph(Paragraph paragraph) {
-
-    Date lastRunningDate;
-    long lastRunningUnixTime;
-
-    Date paragaraphDate = paragraph.getDateStarted();
-    // diff started time <-> finishied time
-    if (paragaraphDate == null) {
-      paragaraphDate = paragraph.getDateFinished();
-    } else {
-      if (paragraph.getDateFinished() != null && paragraph.getDateFinished()
-          .after(paragaraphDate)) {
-        paragaraphDate = paragraph.getDateFinished();
-      }
-    }
-
-    // finished time and started time is not exists.
-    if (paragaraphDate == null) {
-      paragaraphDate = paragraph.getDateCreated();
-    }
-
-    // set last update unixtime(ms).
-    lastRunningDate = paragaraphDate;
-
-    lastRunningUnixTime = lastRunningDate.getTime();
-
-    return lastRunningUnixTime;
-  }
-
-  public List<Map<String, Object>> getJobListByParagraphId(String paragraphId) {
-    String gotNoteId = null;
-    List<Note> notes = getAllNotes();
-    for (Note note : notes) {
-      Paragraph p = note.getParagraph(paragraphId);
-      if (p != null) {
-        gotNoteId = note.getId();
-      }
-    }
-    return getJobListByNoteId(gotNoteId);
-  }
-
-  public List<Map<String, Object>> getJobListByNoteId(String noteId) {
-    final String CRON_TYPE_NOTE_KEYWORD = "cron";
-    long lastRunningUnixTime = 0;
-    boolean isNoteRunning = false;
-    Note jobNote = getNote(noteId);
-    List<Map<String, Object>> notesInfo = new LinkedList<>();
-    if (jobNote == null) {
-      return notesInfo;
-    }
-
-    Map<String, Object> info = new HashMap<>();
-
-    info.put("noteId", jobNote.getId());
-    String noteName = jobNote.getName();
-    if (noteName != null && !noteName.equals("")) {
-      info.put("noteName", jobNote.getName());
-    } else {
-      info.put("noteName", "Note " + jobNote.getId());
-    }
-    // set note type ( cron or normal )
-    if (jobNote.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !jobNote.getConfig()
-            .get(CRON_TYPE_NOTE_KEYWORD).equals("")) {
-      info.put("noteType", "cron");
-    } else {
-      info.put("noteType", "normal");
-    }
-
-    // set paragraphs
-    List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
-    for (Paragraph paragraph : jobNote.getParagraphs()) {
-      // check paragraph's status.
-      if (paragraph.getStatus().isRunning()) {
-        isNoteRunning = true;
-      }
-
-      // get data for the job manager.
-      Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
-      lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
-
-      paragraphsInfo.add(paragraphItem);
-    }
-
-    // set interpreter bind type
-    String interpreterGroupName = null;
-    if (interpreterSettingManager.getInterpreterSettings(jobNote.getId()) != null
-            && interpreterSettingManager.getInterpreterSettings(jobNote.getId()).size() >= 1) {
-      interpreterGroupName =
-          interpreterSettingManager.getInterpreterSettings(jobNote.getId()).get(0).getName();
-    }
-
-    // note json object root information.
-    info.put("interpreter", interpreterGroupName);
-    info.put("isRunningJob", isNoteRunning);
-    info.put("unixTimeLastRun", lastRunningUnixTime);
-    info.put("paragraphs", paragraphsInfo);
-    notesInfo.add(info);
-
-    return notesInfo;
-  };
-
-  public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload,
-      long lastUpdateServerUnixTime, AuthenticationInfo subject) {
-    final String CRON_TYPE_NOTE_KEYWORD = "cron";
-
-    if (needsReload) {
-      try {
-        reloadAllNotes(subject);
-      } catch (IOException e) {
-        logger.error("Fail to reload notes from repository");
-      }
-    }
-
-    List<Note> notes = getAllNotes();
-    List<Map<String, Object>> notesInfo = new LinkedList<>();
-    for (Note note : notes) {
-      boolean isNoteRunning = false;
-      boolean isUpdateNote = false;
-      long lastRunningUnixTime = 0;
-      Map<String, Object> info = new HashMap<>();
-
-      // set note ID
-      info.put("noteId", note.getId());
-
-      // set note Name
-      String noteName = note.getName();
-      if (noteName != null && !noteName.equals("")) {
-        info.put("noteName", note.getName());
-      } else {
-        info.put("noteName", "Note " + note.getId());
-      }
-
-      // set note type ( cron or normal )
-      if (note.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !note.getConfig()
-          .get(CRON_TYPE_NOTE_KEYWORD).equals("")) {
-        info.put("noteType", "cron");
-      } else {
-        info.put("noteType", "normal");
-      }
-
-      // set paragraphs
-      List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
-      for (Paragraph paragraph : note.getParagraphs()) {
-        // check paragraph's status.
-        if (paragraph.getStatus().isRunning()) {
-          isNoteRunning = true;
-          isUpdateNote = true;
-        }
-
-        // get data for the job manager.
-        Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
-        lastRunningUnixTime = Math.max(getUnixTimeLastRunParagraph(paragraph), lastRunningUnixTime);
-
-        // is update note for last server update time.
-        if (lastRunningUnixTime > lastUpdateServerUnixTime) {
-          isUpdateNote = true;
-        }
-        paragraphsInfo.add(paragraphItem);
-      }
-
-      // set interpreter bind type
-      String interpreterGroupName = null;
-      if (interpreterSettingManager.getInterpreterSettings(note.getId()) != null
-          && interpreterSettingManager.getInterpreterSettings(note.getId()).size() >= 1) {
-        interpreterGroupName =
-            interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getName();
-      }
-
-      // not update and not running -> pass
-      if (!isUpdateNote && !isNoteRunning) {
-        continue;
-      }
-
-      // note json object root information.
-      info.put("interpreter", interpreterGroupName);
-      info.put("isRunningJob", isNoteRunning);
-      info.put("unixTimeLastRun", lastRunningUnixTime);
-      info.put("paragraphs", paragraphsInfo);
-      notesInfo.add(info);
-    }
-
-    return notesInfo;
-  }
-
   /**
    * Cron task for the note.
    */