You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/07/08 06:25:14 UTC
[zeppelin] branch master updated: [ZEPPELIN-4928] Refactoring of
NotebookRestApi
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new b1373e0 [ZEPPELIN-4928] Refactoring of NotebookRestApi
b1373e0 is described below
commit b1373e01ab04d12762f26e7f2543db15bd693544
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jun 30 14:07:10 2020 +0800
[ZEPPELIN-4928] Refactoring of NotebookRestApi
### What is this PR for?
Several improvements:
* Move note and paragraph job status from Note to `NoteJobStatus` & `ParagraphJobStatus`
* Throw exception when running one note while it is still in running state
### What type of PR is it?
[Improvement | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4928
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3839 from zjffdu/ZEPPELIN-4928 and squashes the following commits:
550ff8561 [Jeff Zhang] [ZEPPELIN-4928] Refactoring of NotebookRestApi
---
.../org/apache/zeppelin/rest/NotebookRestApi.java | 125 ++++++++++++---------
.../zeppelin/rest/message/NoteJobStatus.java | 50 +++++++++
.../zeppelin/rest/message/ParagraphJobStatus.java | 66 +++++++++++
.../apache/zeppelin/service/NotebookService.java | 30 ++---
.../apache/zeppelin/rest/ZeppelinRestApiTest.java | 22 ++--
.../interpreter/ManagedInterpreterGroup.java | 8 +-
.../java/org/apache/zeppelin/notebook/Note.java | 43 +------
7 files changed, 225 insertions(+), 119 deletions(-)
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index b4f136c..13c1b99 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -56,6 +56,8 @@ import org.apache.zeppelin.rest.exception.ParagraphNotFoundException;
import org.apache.zeppelin.rest.message.CronRequest;
import org.apache.zeppelin.rest.message.NewNoteRequest;
import org.apache.zeppelin.rest.message.NewParagraphRequest;
+import org.apache.zeppelin.rest.message.NoteJobStatus;
+import org.apache.zeppelin.rest.message.ParagraphJobStatus;
import org.apache.zeppelin.rest.message.RenameNoteRequest;
import org.apache.zeppelin.rest.message.ParametersRequest;
import org.apache.zeppelin.rest.message.UpdateParagraphRequest;
@@ -78,8 +80,8 @@ import org.slf4j.LoggerFactory;
@Produces("application/json")
@Singleton
public class NotebookRestApi extends AbstractRestApi {
- private static final Logger LOG = LoggerFactory.getLogger(NotebookRestApi.class);
- private static Gson gson = new Gson();
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotebookRestApi.class);
+ private static final Gson GSON = new Gson();
private ZeppelinConfiguration zConf;
private Notebook notebook;
@@ -133,7 +135,7 @@ public class NotebookRestApi extends AbstractRestApi {
}
private String ownerPermissionError(Set<String> current, Set<String> allowed) {
- LOG.info("Cannot change permissions. Connection owners {}. Allowed owners {}",
+ LOGGER.info("Cannot change permissions. Connection owners {}. Allowed owners {}",
current.toString(), allowed.toString());
return "Insufficient privileges to change permissions.\n\n" +
"Allowed owners: " + allowed.toString() + "\n\n" +
@@ -156,7 +158,7 @@ public class NotebookRestApi extends AbstractRestApi {
private void checkIfUserIsAnon(String errorMsg) {
boolean isAuthenticated = authenticationService.isAuthenticated();
if (isAuthenticated && authenticationService.getPrincipal().equals("anonymous")) {
- LOG.info("Anonymous user cannot set any permissions for this note.");
+ LOGGER.info("Anonymous user cannot set any permissions for this note.");
throw new ForbiddenException(errorMsg);
}
}
@@ -217,7 +219,7 @@ public class NotebookRestApi extends AbstractRestApi {
private void checkIfNoteSupportsCron(Note note) {
if (!note.isCronSupported(notebook.getConf())) {
- LOG.error("Cron is not enabled from Zeppelin server");
+ LOGGER.error("Cron is not enabled from Zeppelin server");
throw new ForbiddenException("Cron is not enabled from Zeppelin server");
}
}
@@ -247,18 +249,19 @@ public class NotebookRestApi extends AbstractRestApi {
ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId)));
HashMap<String, HashSet<String>> permMap =
- gson.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
+ GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
}.getType());
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
- LOG.info("Set permissions {} {} {} {} {} {}", noteId, principal, permMap.get("owners"),
- permMap.get("readers"), permMap.get("runners"), permMap.get("writers"));
-
HashSet<String> readers = permMap.get("readers");
HashSet<String> runners = permMap.get("runners");
HashSet<String> owners = permMap.get("owners");
HashSet<String> writers = permMap.get("writers");
+
+ LOGGER.info("Set permissions to note: {} with current user:{}, owners:{}, readers:{}, runners:{}, writers:{}",
+ noteId, principal, owners, readers, runners, writers);
+
// Set readers, if runners, writers and owners is empty -> set to user requesting the change
if (readers != null && !readers.isEmpty()) {
if (runners.isEmpty()) {
@@ -291,7 +294,7 @@ public class NotebookRestApi extends AbstractRestApi {
authorizationService.setRunners(noteId, runners);
authorizationService.setWriters(noteId, writers);
authorizationService.setOwners(noteId, owners);
- LOG.debug("After set permissions {} {} {} {}", authorizationService.getOwners(noteId),
+ LOGGER.debug("After set permissions {} {} {} {}", authorizationService.getOwners(noteId),
authorizationService.getReaders(noteId), authorizationService.getRunners(noteId),
authorizationService.getWriters(noteId));
AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
@@ -301,6 +304,12 @@ public class NotebookRestApi extends AbstractRestApi {
return new JsonResponse<>(Status.OK).build();
}
+ /**
+ * Return noteinfo list for the current user who has reader permission.
+ *
+ * @return
+ * @throws IOException
+ */
@GET
@ZeppelinApi
public Response getNoteList() throws IOException {
@@ -309,6 +318,13 @@ public class NotebookRestApi extends AbstractRestApi {
return new JsonResponse<>(Status.OK, "", notesInfo).build();
}
+ /**
+ * Get note of this specified noteId.
+ *
+ * @param noteId
+ * @return
+ * @throws IOException
+ */
@GET
@Path("{noteId}")
@ZeppelinApi
@@ -319,7 +335,7 @@ public class NotebookRestApi extends AbstractRestApi {
}
/**
- * export note REST API.
+ * Export note REST API.
*
* @param noteId ID of Note
* @return note JSON with status.OK
@@ -335,7 +351,8 @@ public class NotebookRestApi extends AbstractRestApi {
}
/**
- * import new note REST API.
+ * Import new note REST API.
+ * TODO(zjffdu) support to import jupyter note.
*
* @param noteJson - note Json
* @return JSON with new note ID
@@ -351,7 +368,7 @@ public class NotebookRestApi extends AbstractRestApi {
}
/**
- * Create new note REST API.
+ * Create new note REST API with note json.
*
* @param message - JSON with new note name
* @return JSON with new note ID
@@ -361,7 +378,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response createNote(String message) throws IOException {
String user = authenticationService.getPrincipal();
- LOG.info("Create new note by JSON {}", message);
+ LOGGER.info("Create new note by JSON {}", message);
NewNoteRequest request = NewNoteRequest.fromJson(message);
Note note = notebookService.createNote(
request.getName(),
@@ -389,7 +406,7 @@ public class NotebookRestApi extends AbstractRestApi {
@Path("{noteId}")
@ZeppelinApi
public Response deleteNote(@PathParam("noteId") String noteId) throws IOException {
- LOG.info("Delete note {} ", noteId);
+ LOGGER.info("Delete note {} ", noteId);
notebookService.removeNote(noteId,
getServiceContext(),
new RestServiceCallback<String>() {
@@ -416,7 +433,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response cloneNote(@PathParam("noteId") String noteId, String message)
throws IOException, IllegalArgumentException {
- LOG.info("clone note by JSON {}", message);
+ LOGGER.info("Clone note by JSON {}", message);
checkIfUserCanWrite(noteId, "Insufficient privileges you cannot clone this note");
NewNoteRequest request = NewNoteRequest.fromJson(message);
String newNoteName = null;
@@ -447,11 +464,11 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response renameNote(@PathParam("noteId") String noteId,
String message) throws IOException {
- LOG.info("rename note by JSON {}", message);
- RenameNoteRequest request = gson.fromJson(message, RenameNoteRequest.class);
+ LOGGER.info("Rename note by JSON {}", message);
+ RenameNoteRequest request = GSON.fromJson(message, RenameNoteRequest.class);
String newName = request.getName();
if (newName.isEmpty()) {
- LOG.warn("Trying to rename notebook {} with empty name parameter", noteId);
+ LOGGER.warn("Trying to rename notebook {} with empty name parameter", noteId);
throw new BadRequestException("name can not be empty");
}
notebookService.renameNote(noteId, request.getName(), false, getServiceContext(),
@@ -478,7 +495,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response insertParagraph(@PathParam("noteId") String noteId, String message)
throws IOException {
String user = authenticationService.getPrincipal();
- LOG.info("insert paragraph {} {}", noteId, message);
+ LOGGER.info("Insert paragraph {} {}", noteId, message);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -511,7 +528,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId) throws IOException {
- LOG.info("get paragraph {} {}", noteId, paragraphId);
+ LOGGER.info("Get paragraph {} {}", noteId, paragraphId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -523,7 +540,7 @@ public class NotebookRestApi extends AbstractRestApi {
}
/**
- * Update paragraph.
+ * Update paragraph. Only update title and text is supported.
*
* @param message json containing the "text" and optionally the "title" of the paragraph, e.g.
* {"text" : "updated text", "title" : "Updated title" }
@@ -535,7 +552,7 @@ public class NotebookRestApi extends AbstractRestApi {
@PathParam("paragraphId") String paragraphId,
String message) throws IOException {
String user = authenticationService.getPrincipal();
- LOG.info("{} will update paragraph {} {}", user, noteId, paragraphId);
+ LOGGER.info("{} will update paragraph {} {}", user, noteId, paragraphId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -543,7 +560,7 @@ public class NotebookRestApi extends AbstractRestApi {
Paragraph p = note.getParagraph(paragraphId);
checkIfParagraphIsNotNull(p);
- UpdateParagraphRequest updatedParagraph = gson.fromJson(message, UpdateParagraphRequest.class);
+ UpdateParagraphRequest updatedParagraph = GSON.fromJson(message, UpdateParagraphRequest.class);
p.setText(updatedParagraph.getText());
if (updatedParagraph.getTitle() != null) {
@@ -556,6 +573,15 @@ public class NotebookRestApi extends AbstractRestApi {
return new JsonResponse<>(Status.OK, "").build();
}
+ /**
+ * Update paragraph config rest api.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param message
+ * @return
+ * @throws IOException
+ */
@PUT
@Path("{noteId}/paragraph/{paragraphId}/config")
@ZeppelinApi
@@ -563,7 +589,7 @@ public class NotebookRestApi extends AbstractRestApi {
@PathParam("paragraphId") String paragraphId,
String message) throws IOException {
String user = authenticationService.getPrincipal();
- LOG.info("{} will update paragraph config {} {}", user, noteId, paragraphId);
+ LOGGER.info("{} will update paragraph config {} {}", user, noteId, paragraphId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -571,7 +597,7 @@ public class NotebookRestApi extends AbstractRestApi {
Paragraph p = note.getParagraph(paragraphId);
checkIfParagraphIsNotNull(p);
- Map<String, Object> newConfig = gson.fromJson(message, HashMap.class);
+ Map<String, Object> newConfig = GSON.fromJson(message, HashMap.class);
configureParagraph(p, newConfig, user);
AuthenticationInfo subject = new AuthenticationInfo(user);
notebook.saveNote(note, subject);
@@ -592,7 +618,7 @@ public class NotebookRestApi extends AbstractRestApi {
@PathParam("paragraphId") String paragraphId,
@PathParam("newIndex") String newIndex)
throws IOException {
- LOG.info("move paragraph {} {} {}", noteId, paragraphId, newIndex);
+ LOGGER.info("Move paragraph {} {} {}", noteId, paragraphId, newIndex);
notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex),
getServiceContext(),
new RestServiceCallback<Paragraph>() {
@@ -617,7 +643,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response deleteParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId) throws IOException {
- LOG.info("delete paragraph {} {}", noteId, paragraphId);
+ LOGGER.info("Delete paragraph {} {}", noteId, paragraphId);
notebookService.removeParagraph(noteId, paragraphId, getServiceContext(),
new RestServiceCallback<Paragraph>() {
@Override
@@ -640,7 +666,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response clearAllParagraphOutput(@PathParam("noteId") String noteId)
throws IOException {
- LOG.info("clear all paragraph output of note {}", noteId);
+ LOGGER.info("Clear all paragraph output of note {}", noteId);
notebookService.clearAllParagraphOutput(noteId, getServiceContext(),
new RestServiceCallback<>());
return new JsonResponse(Status.OK, "").build();
@@ -678,7 +704,7 @@ public class NotebookRestApi extends AbstractRestApi {
params = request.getParams();
}
- LOG.info("Run note jobs, noteId: {}, blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params);
+ LOGGER.info("Run note jobs, noteId: {}, blocking: {}, isolated: {}, params: {}", noteId, blocking, isolated, params);
Note note = notebook.getNote(noteId);
AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
subject.setRoles(new LinkedList<>(authenticationService.getAssociatedRoles()));
@@ -690,7 +716,7 @@ public class NotebookRestApi extends AbstractRestApi {
note.runAll(subject, blocking, isolated, params);
return new JsonResponse<>(Status.OK).build();
} catch (Exception ex) {
- LOG.error("Exception from run", ex);
+ LOGGER.error("Exception from run", ex);
return new JsonResponse<>(Status.EXPECTATION_FAILED, ex.getMessage()).build();
}
}
@@ -708,7 +734,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response stopNoteJobs(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
- LOG.info("stop note jobs {} ", noteId);
+ LOGGER.info("Stop note jobs {} ", noteId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
checkIfUserCanRun(noteId, "Insufficient privileges you cannot stop this job for this note");
@@ -734,12 +760,12 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getNoteJobStatus(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
- LOG.info("get note job status.");
+ LOGGER.info("Get note job status.");
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
checkIfUserCanRead(noteId, "Insufficient privileges you cannot get job status");
- return new JsonResponse<>(Status.OK, null, note.generateParagraphsInfo()).build();
+ return new JsonResponse<>(Status.OK, null, new NoteJobStatus(note)).build();
}
/**
@@ -757,7 +783,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response getNoteParagraphJobStatus(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId)
throws IOException, IllegalArgumentException {
- LOG.info("get note paragraph job status.");
+ LOGGER.info("Get note paragraph job status.");
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
checkIfUserCanRead(noteId, "Insufficient privileges you cannot get job status");
@@ -765,8 +791,7 @@ public class NotebookRestApi extends AbstractRestApi {
Paragraph paragraph = note.getParagraph(paragraphId);
checkIfParagraphIsNotNull(paragraph);
- return new JsonResponse<>(Status.OK, null, note.generateSingleParagraphInfo(paragraphId)).
- build();
+ return new JsonResponse<>(Status.OK, null, new ParagraphJobStatus(paragraph)).build();
}
/**
@@ -784,7 +809,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response runParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId, String message)
throws IOException, IllegalArgumentException {
- LOG.info("run paragraph job asynchronously {} {} {}", noteId, paragraphId, message);
+ LOGGER.info("Run paragraph job asynchronously {} {} {}", noteId, paragraphId, message);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -821,7 +846,7 @@ public class NotebookRestApi extends AbstractRestApi {
@PathParam("paragraphId") String paragraphId,
String message)
throws IOException, IllegalArgumentException {
- LOG.info("run paragraph synchronously {} {} {}", noteId, paragraphId, message);
+ LOGGER.info("Run paragraph synchronously {} {} {}", noteId, paragraphId, message);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -866,7 +891,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response cancelParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId)
throws IOException, IllegalArgumentException {
- LOG.info("stop paragraph job {} ", noteId);
+ LOGGER.info("stop paragraph job {} ", noteId);
notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(),
new RestServiceCallback<Paragraph>());
return new JsonResponse<>(Status.OK).build();
@@ -885,7 +910,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response registerCronJob(@PathParam("noteId") String noteId, String message)
throws IOException, IllegalArgumentException {
- LOG.info("Register cron job note={} request cron msg={}", noteId, message);
+ LOGGER.info("Register cron job note={} request cron msg={}", noteId, message);
CronRequest request = CronRequest.fromJson(message);
@@ -920,7 +945,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response removeCronJob(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
- LOG.info("Remove cron job note {}", noteId);
+ LOGGER.info("Remove cron job note {}", noteId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -950,7 +975,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getCronJob(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
- LOG.info("Get cron job note {}", noteId);
+ LOGGER.info("Get cron job note {}", noteId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -974,7 +999,7 @@ public class NotebookRestApi extends AbstractRestApi {
@Path("jobmanager/")
@ZeppelinApi
public Response getJobListforNote() throws IOException, IllegalArgumentException {
- LOG.info("Get note jobs for job manager");
+ LOGGER.info("Get note jobs for job manager");
List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
.getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
@@ -997,7 +1022,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime)
throws IOException, IllegalArgumentException {
- LOG.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
+ LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
List<JobManagerService.NoteJobInfo> noteJobs =
jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
new RestServiceCallback<>());
@@ -1014,7 +1039,7 @@ public class NotebookRestApi extends AbstractRestApi {
@Path("search")
@ZeppelinApi
public Response search(@QueryParam("q") String queryTerm) {
- LOG.info("Searching notes for: {}", queryTerm);
+ LOGGER.info("Searching notes for: {}", queryTerm);
String principal = authenticationService.getPrincipal();
Set<String> roles = authenticationService.getAssociatedRoles();
HashSet<String> userAndRoles = new HashSet<>();
@@ -1032,7 +1057,7 @@ public class NotebookRestApi extends AbstractRestApi {
i--;
}
}
- LOG.info("{} notes found", notesFound.size());
+ LOGGER.info("{} notes found", notesFound.size());
return new JsonResponse<>(Status.OK, notesFound).build();
}
@@ -1053,7 +1078,7 @@ public class NotebookRestApi extends AbstractRestApi {
}
private void initParagraph(Paragraph p, NewParagraphRequest request, String user) {
- LOG.info("Init Paragraph for user {}", user);
+ LOGGER.info("Init Paragraph for user {}", user);
checkIfParagraphIsNotNull(p);
p.setTitle(request.getTitle());
p.setText(request.getText());
@@ -1064,9 +1089,9 @@ public class NotebookRestApi extends AbstractRestApi {
}
private void configureParagraph(Paragraph p, Map<String, Object> newConfig, String user) {
- LOG.info("Configure Paragraph for user {}", user);
+ LOGGER.info("Configure Paragraph for user {}", user);
if (newConfig == null || newConfig.isEmpty()) {
- LOG.warn("{} is trying to update paragraph {} of note {} with empty config",
+ LOGGER.warn("{} is trying to update paragraph {} of note {} with empty config",
user, p.getId(), p.getNote().getId());
throw new BadRequestException("paragraph config cannot be empty");
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java
new file mode 100644
index 0000000..27d9026
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NoteJobStatus.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.SerializedName;
+import org.apache.zeppelin.notebook.Note;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NoteJobStatus {
+ private static final Gson GSON = new Gson();
+
+ private String id;
+ private boolean isRunning;
+ @SerializedName("paragraphs")
+ private List<ParagraphJobStatus> paragraphJobStatusList;
+
+ public NoteJobStatus(Note note) {
+ this.id = note.getId();
+ this.isRunning = note.isRunning();
+ this.paragraphJobStatusList = note.getParagraphs().stream()
+ .map(p -> new ParagraphJobStatus(p))
+ .collect(Collectors.toList());
+ }
+
+ public List<ParagraphJobStatus> getParagraphJobStatusList() {
+ return paragraphJobStatusList;
+ }
+
+ public static NoteJobStatus fromJson(String json) {
+ return GSON.fromJson(json, NoteJobStatus.class);
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java
new file mode 100644
index 0000000..8249524
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/ParagraphJobStatus.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+import org.apache.zeppelin.notebook.Paragraph;
+
+public class ParagraphJobStatus {
+ private String id;
+ private String status;
+ private String started;
+ private String finished;
+ private String progress;
+
+ public ParagraphJobStatus(Paragraph p) {
+ this.id = p.getId();
+ this.status = p.getStatus().toString();
+ if (p.getDateStarted() != null) {
+ this.started = p.getDateStarted().toString();
+ }
+ if (p.getDateFinished() != null) {
+ this.finished = p.getDateFinished().toString();
+ }
+ if (p.getStatus().isRunning()) {
+ this.progress = String.valueOf(p.progress());
+ } else if (p.isTerminated()){
+ this.progress = String.valueOf(100);
+ } else {
+ this.progress = String.valueOf(0);
+ }
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public String getStarted() {
+ return started;
+ }
+
+ public String getFinished() {
+ return finished;
+ }
+
+ public String getProgress() {
+ return progress;
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index c73eaf5..a36a8d0 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -393,10 +393,10 @@ public class NotebookService {
return false;
}
- note.setRunning(true);
- try {
- if (paragraphs != null) {
- // run note via the data passed from frontend
+ if (paragraphs != null) {
+ // run note via the data passed from frontend
+ try {
+ note.setRunning(true);
for (Map<String, Object> raw : paragraphs) {
String paragraphId = (String) raw.get("id");
if (paragraphId == null) {
@@ -418,18 +418,18 @@ public class NotebookService {
throw new IOException("Fail to run paragraph json: " + raw);
}
}
- } else {
- try {
- // run note directly when parameter `paragraphs` is null.
- note.runAll(context.getAutheInfo(), true, false, new HashMap<>());
- return true;
- } catch (Exception e) {
- LOGGER.warn("Fail to run note: " + note.getName(), e);
- return false;
- }
+ } finally {
+ note.setRunning(false);
+ }
+ } else {
+ try {
+ // run note directly when parameter `paragraphs` is null.
+ note.runAll(context.getAutheInfo(), true, false, new HashMap<>());
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("Fail to run note: " + note.getName(), e);
+ return false;
}
- } finally {
- note.setRunning(false);
}
return true;
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 65faed8..6e87196 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -34,6 +34,7 @@ import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.notebook.AuthorizationService;
import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.NoteJobStatus;
import org.apache.zeppelin.service.AuthenticationService;
import org.apache.zeppelin.utils.TestUtils;
import org.junit.AfterClass;
@@ -521,10 +522,9 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
Map<String, Object> resp = gson.fromJson(responseBody,
new TypeToken<Map<String, Object>>() {}.getType());
- List<Map<String, Object>> paragraphs = (List<Map<String, Object>>) resp.get("body");
- assertEquals(1, paragraphs.size());
- assertTrue(paragraphs.get(0).containsKey("progress"));
- int progress = Integer.parseInt((String) paragraphs.get(0).get("progress"));
+ NoteJobStatus noteJobStatus = NoteJobStatus.fromJson(gson.toJson(resp.get("body")));
+ assertEquals(1, noteJobStatus.getParagraphJobStatusList().size());
+ int progress = Integer.parseInt(noteJobStatus.getParagraphJobStatusList().get(0).getProgress());
assertTrue(progress >= 0 && progress <= 100);
// wait until job is finished or timeout.
@@ -652,7 +652,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
config.put("enabled", true);
paragraph.setConfig(config);
- note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>());
+ note.runAll(AuthenticationInfo.ANONYMOUS, true, true, new HashMap<>());
String jsonRequest = "{\"cron\":\"* * * * * ?\" }";
// right cron expression.
@@ -664,7 +664,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System");
note.setName("System/test2");
- note.runAll(AuthenticationInfo.ANONYMOUS, false, false, new HashMap<>());
+ note.runAll(AuthenticationInfo.ANONYMOUS, true, true, new HashMap<>());
postCron = httpPost("/notebook/cron/" + note.getId(), jsonRequest);
assertThat("", postCron, isAllowed());
postCron.releaseConnection();
@@ -687,7 +687,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
}
@Test
- public void testRegressionZEPPELIN_527() throws IOException {
+ public void testRegressionZEPPELIN_527() throws Exception {
Note note = null;
try {
note = TestUtils.getInstance(Notebook.class).createNote("note1_testRegressionZEPPELIN_527", anonymous);
@@ -695,16 +695,16 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
note.setName("note for run test");
Paragraph paragraph = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
paragraph.setText("%spark\nval param = z.input(\"param\").toString\nprintln(param)");
-
+ note.runAll(AuthenticationInfo.ANONYMOUS, true, false, new HashMap<>());
TestUtils.getInstance(Notebook.class).saveNote(note, anonymous);
GetMethod getNoteJobs = httpGet("/notebook/job/" + note.getId());
assertThat("test note jobs run:", getNoteJobs, isAllowed());
Map<String, Object> resp = gson.fromJson(getNoteJobs.getResponseBodyAsString(),
new TypeToken<Map<String, Object>>() {}.getType());
- List<Map<String, String>> body = (List<Map<String, String>>) resp.get("body");
- assertFalse(body.get(0).containsKey("started"));
- assertFalse(body.get(0).containsKey("finished"));
+ NoteJobStatus noteJobStatus = NoteJobStatus.fromJson(gson.toJson(resp.get("body")));
+ assertNotNull(noteJobStatus.getParagraphJobStatusList().get(0).getStarted());
+ assertNotNull(noteJobStatus.getParagraphJobStatusList().get(0).getFinished());
getNoteJobs.releaseConnection();
} finally {
//cleanup
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index a27677f..a1a478e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -147,9 +147,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
if (Boolean.parseBoolean(
interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) {
for (final Job job : scheduler.getAllJobs()) {
- job.abort();
- job.setStatus(Job.Status.ABORT);
- LOGGER.info("Job " + job.getJobName() + " aborted ");
+ if (!job.isTerminated()) {
+ job.abort();
+ job.setStatus(Job.Status.ABORT);
+ LOGGER.info("Job " + job.getJobName() + " aborted ");
+ }
}
} else {
LOGGER.info("Keep job running while closing interpreter: " + interpreter.getClassName());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 882b368..447bd2d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -703,46 +703,6 @@ public class Note implements JsonSerializable {
}
}
- public List<Map<String, String>> generateParagraphsInfo() {
- List<Map<String, String>> paragraphsInfo = new LinkedList<>();
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- Map<String, String> info = populateParagraphInfo(p);
- paragraphsInfo.add(info);
- }
- }
- return paragraphsInfo;
- }
-
- public Map<String, String> generateSingleParagraphInfo(String paragraphId) {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- if (p.getId().equals(paragraphId)) {
- return populateParagraphInfo(p);
- }
- }
- return new HashMap<>();
- }
- }
-
- private Map<String, String> populateParagraphInfo(Paragraph p) {
- Map<String, String> info = new HashMap<>();
- info.put("id", p.getId());
- info.put("status", p.getStatus().toString());
- if (p.getDateStarted() != null) {
- info.put("started", p.getDateStarted().toString());
- }
- if (p.getDateFinished() != null) {
- info.put("finished", p.getDateFinished().toString());
- }
- if (p.getStatus().isRunning()) {
- info.put("progress", String.valueOf(p.progress()));
- } else {
- info.put("progress", String.valueOf(100));
- }
- return info;
- }
-
private void setParagraphMagic(Paragraph p, int index) {
if (paragraphs.size() > 0) {
String replName;
@@ -771,6 +731,9 @@ public class Note implements JsonSerializable {
boolean blocking,
boolean isolated,
Map<String, Object> params) throws Exception {
+ if (isRunning()) {
+ throw new Exception("Unable to run note:" + id + " because it is still in RUNNING state.");
+ }
setIsolatedMode(isolated);
setRunning(true);
setStartTime(DATE_TIME_FORMATTER.format(LocalDateTime.now()));