You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/22 03:31:15 UTC
zeppelin git commit: ZEPPELIN-3737. Wrap JobManager page related
stuff into class JobManagerService
Repository: zeppelin
Updated Branches:
refs/heads/master bfa84f5b8 -> 001c621c7
ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService
### What is this PR for?
This is refactoring PR which move all JobManager page related stuff into class JobManagerService.
### What type of PR is it?
[Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3737
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3154 from zjffdu/ZEPPELIN-3737 and squashes the following commits:
8e334e81b [Jeff Zhang] ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/001c621c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/001c621c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/001c621c
Branch: refs/heads/master
Commit: 001c621c773db837b4c90bbf7868c94f284247ce
Parents: bfa84f5
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Aug 20 17:32:26 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Aug 22 11:31:10 2018 +0800
----------------------------------------------------------------------
.../apache/zeppelin/rest/NotebookRestApi.java | 20 +-
.../zeppelin/service/JobManagerService.java | 161 +++++++++++++++
.../apache/zeppelin/socket/NotebookServer.java | 168 ++++++++-------
.../org/apache/zeppelin/notebook/Notebook.java | 204 -------------------
4 files changed, 260 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index f831f3f..8411263 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -41,6 +41,7 @@ import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest;
import org.apache.zeppelin.rest.message.UpdateParagraphRequest;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.server.JsonResponse;
+import org.apache.zeppelin.service.JobManagerService;
import org.apache.zeppelin.service.NotebookService;
import org.apache.zeppelin.service.ServiceContext;
import org.apache.zeppelin.socket.NotebookServer;
@@ -83,12 +84,14 @@ public class NotebookRestApi extends AbstractRestApi {
private SearchService noteSearchService;
private NotebookAuthorization notebookAuthorization;
private NotebookService notebookService;
+ private JobManagerService jobManagerService;
@Inject
public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) {
this.notebook = notebook;
this.notebookServer = notebookServer;
this.notebookService = new NotebookService(notebook);
+ this.jobManagerService = new JobManagerService(notebook);
this.noteSearchService = search;
this.notebookAuthorization = notebook.getNotebookAuthorization();
this.zConf = notebook.getConf();
@@ -919,15 +922,11 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getJobListforNote() throws IOException, IllegalArgumentException {
LOG.info("Get note jobs for job manager");
-
- AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
- List<Map<String, Object>> noteJobs = notebook
- .getJobListByUnixTime(false, 0, subject);
+ List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
+ .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
-
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", noteJobs);
-
return new JsonResponse<>(Status.OK, response).build();
}
@@ -946,15 +945,12 @@ public class NotebookRestApi extends AbstractRestApi {
public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime)
throws IOException, IllegalArgumentException {
LOG.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
-
- List<Map<String, Object>> noteJobs;
- AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
- noteJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject);
+ List<JobManagerService.NoteJobInfo> noteJobs =
+ jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
+ new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
-
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", noteJobs);
-
return new JsonResponse<>(Status.OK, response).build();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
new file mode 100644
index 0000000..374d8ff
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.scheduler.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Service class for JobManager Page
+ */
+public class JobManagerService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerService.class);
+
+ private Notebook notebook;
+
+ public JobManagerService(Notebook notebook) {
+ this.notebook = notebook;
+ }
+
+ public List<NoteJobInfo> getNoteJobInfo(String noteId,
+ ServiceContext context,
+ ServiceCallback<List<NoteJobInfo>> callback)
+ throws IOException {
+ List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+ Note jobNote = notebook.getNote(noteId);
+ notesJobInfo.add(new NoteJobInfo(jobNote));
+ callback.onSuccess(notesJobInfo, context);
+ return notesJobInfo;
+ }
+
+ /**
+ * Get all NoteJobInfo after lastUpdateServerUnixTime
+ */
+ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
+ ServiceContext context,
+ ServiceCallback<List<NoteJobInfo>> callback)
+ throws IOException {
+ List<Note> notes = notebook.getAllNotes();
+ List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+ for (Note note : notes) {
+ NoteJobInfo noteJobInfo = new NoteJobInfo(note);
+ if (noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime) {
+ notesJobInfo.add(noteJobInfo);
+ }
+ }
+ callback.onSuccess(notesJobInfo, context);
+ return notesJobInfo;
+ }
+
+ public void removeNoteJobInfo(String noteId,
+ ServiceContext context,
+ ServiceCallback<List<NoteJobInfo>> callback) throws IOException {
+ List<NoteJobInfo> notesJobInfo = new ArrayList<>();
+ notesJobInfo.add(new NoteJobInfo(noteId, true));
+ callback.onSuccess(notesJobInfo, context);
+ }
+
+ private static long getUnixTimeLastRunParagraph(Paragraph paragraph) {
+ if (paragraph.isTerminated() && paragraph.getDateFinished() != null) {
+ return paragraph.getDateFinished().getTime();
+ } else if (paragraph.isRunning()) {
+ return new Date().getTime();
+ } else {
+ return paragraph.getDateCreated().getTime();
+ }
+ }
+
+
+ public static class ParagraphJobInfo {
+ private String id;
+ private String name;
+ private Job.Status status;
+
+ public ParagraphJobInfo(Paragraph p) {
+ this.id = p.getId();
+ if (StringUtils.isBlank(p.getTitle())) {
+ this.name = p.getId();
+ } else {
+ this.name = p.getTitle();
+ }
+ this.status = p.getStatus();
+ }
+ }
+
+ public static class NoteJobInfo {
+ private String noteId;
+ private String noteName;
+ private String noteType;
+ private String interpreter;
+ private boolean isRunningJob;
+ private boolean isRemoved = false;
+ private long unixTimeLastRun;
+ private List<ParagraphJobInfo> paragraphs;
+
+ public NoteJobInfo(Note note) {
+ boolean isNoteRunning = false;
+ long lastRunningUnixTime = 0;
+ this.noteId = note.getId();
+ this.noteName = note.getName();
+ // set note type ( cron or normal )
+ if (isCron(note)) {
+ this.noteType = "cron";
+ } else {
+ this.noteType = "normal";
+ }
+ this.interpreter = note.getDefaultInterpreterGroup();
+
+ // set paragraphs
+ this.paragraphs = new ArrayList<>();
+ for (Paragraph paragraph : note.getParagraphs()) {
+ // check paragraph's status.
+ if (paragraph.getStatus().isRunning()) {
+ isNoteRunning = true;
+ }
+ // get data for the job manager.
+ ParagraphJobInfo paragraphItem = new ParagraphJobInfo(paragraph);
+ lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
+ paragraphs.add(paragraphItem);
+ }
+
+ this.isRunningJob = isNoteRunning;
+ this.unixTimeLastRun = lastRunningUnixTime;
+ }
+
+ private boolean isCron(Note note) {
+ return note.getConfig().containsKey("cron") &&
+ !StringUtils.isBlank(note.getConfig().get("cron").toString());
+ }
+
+ public NoteJobInfo(String noteId, boolean isRemoved) {
+ this.noteId = noteId;
+ this.isRemoved = isRemoved;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 35da481..16719f3 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -57,6 +57,7 @@ import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.service.ConfigurationService;
+import org.apache.zeppelin.service.JobManagerService;
import org.apache.zeppelin.service.NotebookService;
import org.apache.zeppelin.service.ServiceContext;
import org.apache.zeppelin.service.SimpleServiceCallback;
@@ -115,11 +116,11 @@ public class NotebookServer extends WebSocketServlet
/**
* Job manager service type.
*/
- protected enum JobManagerService {
+ protected enum JobManagerServiceType {
JOB_MANAGER_PAGE("JOB_MANAGER_PAGE");
private String serviceTypeKey;
- JobManagerService(String serviceType) {
+ JobManagerServiceType(String serviceType) {
this.serviceTypeKey = serviceType;
}
@@ -146,6 +147,7 @@ public class NotebookServer extends WebSocketServlet
private NotebookService notebookService;
private ConfigurationService configurationService;
+ private JobManagerService jobManagerService;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
@@ -175,6 +177,13 @@ public class NotebookServer extends WebSocketServlet
return this.configurationService;
}
+ public synchronized JobManagerService getJobManagerService() {
+ if (this.jobManagerService == null) {
+ this.jobManagerService = new JobManagerService(notebook());
+ }
+ return this.jobManagerService;
+ }
+
@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator(new NotebookWebSocketCreator(this));
@@ -609,36 +618,49 @@ public class NotebookServer extends WebSocketServlet
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
- addConnectionToNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn);
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- List<Map<String, Object>> noteJobs = notebook().getJobListByUnixTime(false, 0, subject);
- Map<String, Object> response = new HashMap<>();
-
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", noteJobs);
+ addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+ getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) {
+ @Override
+ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+ ServiceContext context) throws IOException {
+ super.onSuccess(notesJobInfo, context);
+ Map<String, Object> response = new HashMap<>();
+ response.put("lastResponseUnixTime", System.currentTimeMillis());
+ response.put("jobs", notesJobInfo);
+ conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response)));
+ }
- conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response)));
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ LOG.warn(ex.getMessage());
+ }
+ });
}
public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException {
- List<Map<String, Object>> noteJobs = new LinkedList<>();
- Notebook notebookObject = notebook();
- List<Map<String, Object>> jobNotes;
- if (notebookObject != null) {
- jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null);
- noteJobs = jobNotes == null ? noteJobs : jobNotes;
- }
-
- Map<String, Object> response = new HashMap<>();
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", noteJobs != null ? noteJobs : new LinkedList<>());
+ getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null,
+ new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) {
+ @Override
+ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+ ServiceContext context) throws IOException {
+ super.onSuccess(notesJobInfo, context);
+ Map<String, Object> response = new HashMap<>();
+ response.put("lastResponseUnixTime", System.currentTimeMillis());
+ response.put("jobs", notesJobInfo);
+ broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ }
- broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
- new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ LOG.warn(ex.getMessage());
+ }
+ });
}
public void unsubscribeNoteJobInfo(NotebookSocket conn) {
- removeConnectionFromNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn);
+ removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
}
public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -2033,7 +2055,7 @@ public class NotebookServer extends WebSocketServlet
/**
* Notebook Information Change event.
*/
- public static class NotebookInformationListener implements NotebookEventListener {
+ public class NotebookInformationListener implements NotebookEventListener {
private NotebookServer notebookServer;
public NotebookInformationListener(NotebookServer notebookServer) {
@@ -2043,9 +2065,10 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onParagraphRemove(Paragraph p) {
try {
- notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
- } catch (IOException ioe) {
- LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+ getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, null,
+ new JobManagerServiceCallback());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
}
}
@@ -2053,72 +2076,63 @@ public class NotebookServer extends WebSocketServlet
public void onNoteRemove(Note note) {
try {
notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
- } catch (IOException ioe) {
- LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
}
- List<Map<String, Object>> notesInfo = new LinkedList<>();
- Map<String, Object> info = new HashMap<>();
- info.put("noteId", note.getId());
- // set paragraphs
- List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
-
- // notebook json object root information.
- info.put("isRunningJob", false);
- info.put("unixTimeLastRun", 0);
- info.put("isRemoved", true);
- info.put("paragraphs", paragraphsInfo);
- notesInfo.add(info);
-
- Map<String, Object> response = new HashMap<>();
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", notesInfo);
-
- notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
- new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ try {
+ getJobManagerService().removeNoteJobInfo(note.getId(), null,
+ new JobManagerServiceCallback());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+ }
}
@Override
public void onParagraphCreate(Paragraph p) {
- Notebook notebook = notebookServer.notebook();
- List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId());
- Map<String, Object> response = new HashMap<>();
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", notebookJobs);
-
- notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
- new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ try {
+ notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null,
+ new JobManagerServiceCallback());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+ }
}
@Override
public void onNoteCreate(Note note) {
- Notebook notebook = notebookServer.notebook();
- List<Map<String, Object>> notebookJobs = notebook.getJobListByNoteId(note.getId());
- Map<String, Object> response = new HashMap<>();
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", notebookJobs);
-
- notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
- new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ try {
+ notebookServer.getJobManagerService().getNoteJobInfo(note.getId(), null,
+ new JobManagerServiceCallback());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+ }
}
@Override
public void onParagraphStatusChange(Paragraph p, Status status) {
- Notebook notebook = notebookServer.notebook();
- List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId());
-
- Map<String, Object> response = new HashMap<>();
- response.put("lastResponseUnixTime", System.currentTimeMillis());
- response.put("jobs", notebookJobs);
-
- notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(),
- new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ try {
+ notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null,
+ new JobManagerServiceCallback());
+ } catch (IOException e) {
+ LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
+ }
}
+ private class JobManagerServiceCallback
+ extends SimpleServiceCallback<List<JobManagerService.NoteJobInfo>> {
+ @Override
+ public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
+ ServiceContext context) throws IOException {
+ super.onSuccess(notesJobInfo, context);
+ Map<String, Object> response = new HashMap<>();
+ response.put("lastResponseUnixTime", System.currentTimeMillis());
+ response.put("jobs", notesJobInfo);
+ broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
+ }
+ }
}
-
-
@Override
public void onProgressUpdate(Paragraph p, int progress) {
broadcast(p.getNote().getId(),
@@ -2458,7 +2472,7 @@ public class NotebookServer extends WebSocketServlet
return new ServiceContext(authInfo, userAndRoles);
}
- private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
+ public class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
private NotebookSocket conn;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 7cf0f54..b7dcdc3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -613,210 +613,6 @@ public class Notebook implements NoteEventListener {
}
}
- private Map<String, Object> getParagraphForJobManagerItem(Paragraph paragraph) {
- Map<String, Object> paragraphItem = new HashMap<>();
-
- // set paragraph id
- paragraphItem.put("id", paragraph.getId());
-
- // set paragraph name
- String paragraphName = paragraph.getTitle();
- if (paragraphName != null) {
- paragraphItem.put("name", paragraphName);
- } else {
- paragraphItem.put("name", paragraph.getId());
- }
-
- // set status for paragraph.
- paragraphItem.put("status", paragraph.getStatus().toString());
-
- return paragraphItem;
- }
-
- private long getUnixTimeLastRunParagraph(Paragraph paragraph) {
-
- Date lastRunningDate;
- long lastRunningUnixTime;
-
- Date paragaraphDate = paragraph.getDateStarted();
- // diff started time <-> finishied time
- if (paragaraphDate == null) {
- paragaraphDate = paragraph.getDateFinished();
- } else {
- if (paragraph.getDateFinished() != null && paragraph.getDateFinished()
- .after(paragaraphDate)) {
- paragaraphDate = paragraph.getDateFinished();
- }
- }
-
- // finished time and started time is not exists.
- if (paragaraphDate == null) {
- paragaraphDate = paragraph.getDateCreated();
- }
-
- // set last update unixtime(ms).
- lastRunningDate = paragaraphDate;
-
- lastRunningUnixTime = lastRunningDate.getTime();
-
- return lastRunningUnixTime;
- }
-
- public List<Map<String, Object>> getJobListByParagraphId(String paragraphId) {
- String gotNoteId = null;
- List<Note> notes = getAllNotes();
- for (Note note : notes) {
- Paragraph p = note.getParagraph(paragraphId);
- if (p != null) {
- gotNoteId = note.getId();
- }
- }
- return getJobListByNoteId(gotNoteId);
- }
-
- public List<Map<String, Object>> getJobListByNoteId(String noteId) {
- final String CRON_TYPE_NOTE_KEYWORD = "cron";
- long lastRunningUnixTime = 0;
- boolean isNoteRunning = false;
- Note jobNote = getNote(noteId);
- List<Map<String, Object>> notesInfo = new LinkedList<>();
- if (jobNote == null) {
- return notesInfo;
- }
-
- Map<String, Object> info = new HashMap<>();
-
- info.put("noteId", jobNote.getId());
- String noteName = jobNote.getName();
- if (noteName != null && !noteName.equals("")) {
- info.put("noteName", jobNote.getName());
- } else {
- info.put("noteName", "Note " + jobNote.getId());
- }
- // set note type ( cron or normal )
- if (jobNote.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !jobNote.getConfig()
- .get(CRON_TYPE_NOTE_KEYWORD).equals("")) {
- info.put("noteType", "cron");
- } else {
- info.put("noteType", "normal");
- }
-
- // set paragraphs
- List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
- for (Paragraph paragraph : jobNote.getParagraphs()) {
- // check paragraph's status.
- if (paragraph.getStatus().isRunning()) {
- isNoteRunning = true;
- }
-
- // get data for the job manager.
- Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
- lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
-
- paragraphsInfo.add(paragraphItem);
- }
-
- // set interpreter bind type
- String interpreterGroupName = null;
- if (interpreterSettingManager.getInterpreterSettings(jobNote.getId()) != null
- && interpreterSettingManager.getInterpreterSettings(jobNote.getId()).size() >= 1) {
- interpreterGroupName =
- interpreterSettingManager.getInterpreterSettings(jobNote.getId()).get(0).getName();
- }
-
- // note json object root information.
- info.put("interpreter", interpreterGroupName);
- info.put("isRunningJob", isNoteRunning);
- info.put("unixTimeLastRun", lastRunningUnixTime);
- info.put("paragraphs", paragraphsInfo);
- notesInfo.add(info);
-
- return notesInfo;
- };
-
- public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload,
- long lastUpdateServerUnixTime, AuthenticationInfo subject) {
- final String CRON_TYPE_NOTE_KEYWORD = "cron";
-
- if (needsReload) {
- try {
- reloadAllNotes(subject);
- } catch (IOException e) {
- logger.error("Fail to reload notes from repository");
- }
- }
-
- List<Note> notes = getAllNotes();
- List<Map<String, Object>> notesInfo = new LinkedList<>();
- for (Note note : notes) {
- boolean isNoteRunning = false;
- boolean isUpdateNote = false;
- long lastRunningUnixTime = 0;
- Map<String, Object> info = new HashMap<>();
-
- // set note ID
- info.put("noteId", note.getId());
-
- // set note Name
- String noteName = note.getName();
- if (noteName != null && !noteName.equals("")) {
- info.put("noteName", note.getName());
- } else {
- info.put("noteName", "Note " + note.getId());
- }
-
- // set note type ( cron or normal )
- if (note.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !note.getConfig()
- .get(CRON_TYPE_NOTE_KEYWORD).equals("")) {
- info.put("noteType", "cron");
- } else {
- info.put("noteType", "normal");
- }
-
- // set paragraphs
- List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
- for (Paragraph paragraph : note.getParagraphs()) {
- // check paragraph's status.
- if (paragraph.getStatus().isRunning()) {
- isNoteRunning = true;
- isUpdateNote = true;
- }
-
- // get data for the job manager.
- Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
- lastRunningUnixTime = Math.max(getUnixTimeLastRunParagraph(paragraph), lastRunningUnixTime);
-
- // is update note for last server update time.
- if (lastRunningUnixTime > lastUpdateServerUnixTime) {
- isUpdateNote = true;
- }
- paragraphsInfo.add(paragraphItem);
- }
-
- // set interpreter bind type
- String interpreterGroupName = null;
- if (interpreterSettingManager.getInterpreterSettings(note.getId()) != null
- && interpreterSettingManager.getInterpreterSettings(note.getId()).size() >= 1) {
- interpreterGroupName =
- interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getName();
- }
-
- // not update and not running -> pass
- if (!isUpdateNote && !isNoteRunning) {
- continue;
- }
-
- // note json object root information.
- info.put("interpreter", interpreterGroupName);
- info.put("isRunningJob", isNoteRunning);
- info.put("unixTimeLastRun", lastRunningUnixTime);
- info.put("paragraphs", paragraphsInfo);
- notesInfo.add(info);
- }
-
- return notesInfo;
- }
-
/**
* Cron task for the note.
*/