You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/08/10 06:54:38 UTC
[2/3] zeppelin git commit: ZEPPELIN-3681. Introduce NotebookService
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 78ff078..a376623 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -54,9 +54,13 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.service.NotebookService;
+import org.apache.zeppelin.service.ServiceContext;
+import org.apache.zeppelin.service.SimpleServiceCallback;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
@@ -75,6 +79,7 @@ import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
+import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.text.ParseException;
@@ -102,11 +107,11 @@ import java.util.regex.Pattern;
*/
public class NotebookServer extends WebSocketServlet
implements NotebookSocketListener,
- JobListenerFactory,
- AngularObjectRegistryListener,
- RemoteInterpreterProcessListener,
- ApplicationEventListener,
- NotebookServerMBean {
+ JobListenerFactory,
+ AngularObjectRegistryListener,
+ RemoteInterpreterProcessListener,
+ ApplicationEventListener,
+ NotebookServerMBean {
/**
* Job manager service type.
@@ -127,8 +132,8 @@ public class NotebookServer extends WebSocketServlet
private HashSet<String> collaborativeModeList = new HashSet<>();
private Boolean collaborativeModeEnable = ZeppelinConfiguration
- .create()
- .isZeppelinNotebookCollaborativeModeEnable();
+ .create()
+ .isZeppelinNotebookCollaborativeModeEnable();
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
private static Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -140,6 +145,8 @@ public class NotebookServer extends WebSocketServlet
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
+ private NotebookService notebookService;
+
private ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
@@ -154,6 +161,13 @@ public class NotebookServer extends WebSocketServlet
return ZeppelinServer.notebook;
}
+ private synchronized NotebookService getNotebookService() {
+ if (this.notebookService == null) {
+ this.notebookService = new NotebookService(notebook());
+ }
+ return this.notebookService;
+ }
+
@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator(new NotebookWebSocketCreator(this));
@@ -174,8 +188,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onOpen(NotebookSocket conn) {
- LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
- conn.getRequest().getRemotePort());
+ LOG.info("New connection from {}", conn);
connectedSockets.add(conn);
}
@@ -184,12 +197,13 @@ public class NotebookServer extends WebSocketServlet
Notebook notebook = notebook();
try {
Message messagereceived = deserializeMessage(msg);
- LOG.debug("RECEIVE << " + messagereceived.op +
- ", RECEIVE PRINCIPAL << " + messagereceived.principal +
- ", RECEIVE TICKET << " + messagereceived.ticket +
- ", RECEIVE ROLES << " + messagereceived.roles +
- ", RECEIVE DATA << " + messagereceived.data);
-
+ if (messagereceived.op != OP.PING) {
+ LOG.debug("RECEIVE: " + messagereceived.op +
+ ", RECEIVE PRINCIPAL: " + messagereceived.principal +
+ ", RECEIVE TICKET: " + messagereceived.ticket +
+ ", RECEIVE ROLES: " + messagereceived.roles +
+ ", RECEIVE DATA: " + messagereceived.data);
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("RECEIVE MSG = " + messagereceived);
}
@@ -237,22 +251,22 @@ public class NotebookServer extends WebSocketServlet
// Lets be elegant here
switch (messagereceived.op) {
case LIST_NOTES:
- unicastNoteList(conn, subject, userAndRoles);
+ listNotes(conn, messagereceived);
break;
case RELOAD_NOTES_FROM_REPO:
broadcastReloadedNoteList(subject, userAndRoles);
break;
case GET_HOME_NOTE:
- sendHomeNote(conn, userAndRoles, notebook, messagereceived);
+ getHomeNote(conn, messagereceived);
break;
case GET_NOTE:
- sendNote(conn, userAndRoles, notebook, messagereceived);
+ getNote(conn, messagereceived);
break;
case NEW_NOTE:
- createNote(conn, userAndRoles, notebook, messagereceived);
+ createNote(conn, messagereceived);
break;
case DEL_NOTE:
- removeNote(conn, userAndRoles, notebook, messagereceived);
+ deleteNote(conn, messagereceived);
break;
case REMOVE_FOLDER:
removeFolder(conn, userAndRoles, notebook, messagereceived);
@@ -270,55 +284,55 @@ public class NotebookServer extends WebSocketServlet
restoreFolder(conn, userAndRoles, notebook, messagereceived);
break;
case RESTORE_NOTE:
- restoreNote(conn, userAndRoles, notebook, messagereceived);
+ restoreNote(conn, messagereceived);
break;
case RESTORE_ALL:
restoreAll(conn, userAndRoles, notebook, messagereceived);
break;
case CLONE_NOTE:
- cloneNote(conn, userAndRoles, notebook, messagereceived);
+ cloneNote(conn, messagereceived);
break;
case IMPORT_NOTE:
- importNote(conn, userAndRoles, notebook, messagereceived);
+ importNote(conn, messagereceived);
break;
case COMMIT_PARAGRAPH:
- updateParagraph(conn, userAndRoles, notebook, messagereceived);
+ updateParagraph(conn, messagereceived);
break;
case RUN_PARAGRAPH:
- runParagraph(conn, userAndRoles, notebook, messagereceived);
+ runParagraph(conn, messagereceived);
break;
case PARAGRAPH_EXECUTED_BY_SPELL:
broadcastSpellExecution(conn, userAndRoles, notebook, messagereceived);
break;
case RUN_ALL_PARAGRAPHS:
- runAllParagraphs(conn, userAndRoles, notebook, messagereceived);
+ runAllParagraphs(conn, messagereceived);
break;
case CANCEL_PARAGRAPH:
- cancelParagraph(conn, userAndRoles, notebook, messagereceived);
+ cancelParagraph(conn, messagereceived);
break;
case MOVE_PARAGRAPH:
- moveParagraph(conn, userAndRoles, notebook, messagereceived);
+ moveParagraph(conn, messagereceived);
break;
case INSERT_PARAGRAPH:
- insertParagraph(conn, userAndRoles, notebook, messagereceived);
+ insertParagraph(conn, messagereceived);
break;
case COPY_PARAGRAPH:
- copyParagraph(conn, userAndRoles, notebook, messagereceived);
+ copyParagraph(conn, messagereceived);
break;
case PARAGRAPH_REMOVE:
- removeParagraph(conn, userAndRoles, notebook, messagereceived);
+ removeParagraph(conn, messagereceived);
break;
case PARAGRAPH_CLEAR_OUTPUT:
- clearParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+ clearParagraphOutput(conn, messagereceived);
break;
case PARAGRAPH_CLEAR_ALL_OUTPUT:
- clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived);
+ clearAllParagraphOutput(conn, messagereceived);
break;
case NOTE_UPDATE:
- updateNote(conn, userAndRoles, notebook, messagereceived);
+ updateNote(conn, messagereceived);
break;
case NOTE_RENAME:
- renameNote(conn, userAndRoles, notebook, messagereceived);
+ renameNote(conn, messagereceived);
break;
case FOLDER_RENAME:
renameFolder(conn, userAndRoles, notebook, messagereceived);
@@ -327,7 +341,7 @@ public class NotebookServer extends WebSocketServlet
updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived);
break;
case COMPLETION:
- completion(conn, userAndRoles, notebook, messagereceived);
+ completion(conn, messagereceived);
break;
case PING:
break; //do nothing
@@ -344,19 +358,19 @@ public class NotebookServer extends WebSocketServlet
sendAllConfigurations(conn, userAndRoles, notebook);
break;
case CHECKPOINT_NOTE:
- checkpointNote(conn, notebook, messagereceived);
+ checkpointNote(conn, messagereceived);
break;
case LIST_REVISION_HISTORY:
- listRevisionHistory(conn, notebook, messagereceived);
+ listRevisionHistory(conn, messagereceived);
break;
case SET_NOTE_REVISION:
- setNoteRevision(conn, userAndRoles, notebook, messagereceived);
+ setNoteRevision(conn, messagereceived);
break;
case NOTE_REVISION:
- getNoteByRevision(conn, notebook, messagereceived);
+ getNoteByRevision(conn, messagereceived);
break;
case NOTE_REVISION_FOR_COMPARE:
- getNoteByRevisionForCompare(conn, notebook, messagereceived);
+ getNoteByRevisionForCompare(conn, messagereceived);
break;
case LIST_NOTE_JOBS:
unicastNoteJobInfo(conn, messagereceived);
@@ -371,16 +385,16 @@ public class NotebookServer extends WebSocketServlet
getEditorSetting(conn, messagereceived);
break;
case GET_INTERPRETER_SETTINGS:
- getInterpreterSettings(conn, subject);
+ getInterpreterSettings(conn);
break;
case WATCHER:
switchConnectionToWatcher(conn, messagereceived);
break;
case SAVE_NOTE_FORMS:
- saveNoteForms(conn, userAndRoles, notebook, messagereceived);
+ saveNoteForms(conn, messagereceived);
break;
case REMOVE_NOTE_FORMS:
- removeNoteForms(conn, userAndRoles, notebook, messagereceived);
+ removeNoteForms(conn, messagereceived);
break;
case PATCH_PARAGRAPH:
patchParagraph(conn, userAndRoles, notebook, messagereceived);
@@ -395,14 +409,14 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onClose(NotebookSocket conn, int code, String reason) {
- LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest().getRemoteAddr(),
- conn.getRequest().getRemotePort(), code, reason);
+ LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}
private void removeUserConnection(String user, NotebookSocket conn) {
+ LOG.debug("Remove user connection {} for user: {}", conn, user);
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).remove(conn);
} else {
@@ -411,6 +425,7 @@ public class NotebookServer extends WebSocketServlet
}
private void addUserConnection(String user, NotebookSocket conn) {
+ LOG.debug("Add user connection {} for user: {}", conn, user);
conn.setUser(user);
if (userConnectedSockets.containsKey(user)) {
userConnectedSockets.get(user).add(conn);
@@ -430,6 +445,7 @@ public class NotebookServer extends WebSocketServlet
}
private void addConnectionToNote(String noteId, NotebookSocket socket) {
+ LOG.debug("Add connection {} to note: {}", socket, noteId);
synchronized (noteSocketMap) {
removeConnectionFromAllNote(socket); // make sure a socket relates only a
// single note.
@@ -446,6 +462,7 @@ public class NotebookServer extends WebSocketServlet
}
private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
+ LOG.debug("Remove connection {} from note: {}", socket, noteId);
synchronized (noteSocketMap) {
List<NotebookSocket> socketList = noteSocketMap.get(noteId);
if (socketList != null) {
@@ -455,7 +472,7 @@ public class NotebookServer extends WebSocketServlet
}
}
- private void removeNote(String noteId) {
+ private void removeConn(String noteId) {
synchronized (noteSocketMap) {
List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
}
@@ -485,7 +502,7 @@ public class NotebookServer extends WebSocketServlet
message.put("status", collaborativeStatusNew);
if (collaborativeStatusNew) {
HashSet<String> userList = new HashSet<>();
- for (NotebookSocket noteSocket: socketList) {
+ for (NotebookSocket noteSocket : socketList) {
userList.add(noteSocket.getUser());
}
message.put("users", userList);
@@ -626,7 +643,8 @@ public class NotebookServer extends WebSocketServlet
}
public List<Map<String, String>> generateNotesInfo(boolean needsReload,
- AuthenticationInfo subject, Set<String> userAndRoles) {
+ AuthenticationInfo subject,
+ Set<String> userAndRoles) {
Notebook notebook = notebook();
ZeppelinConfiguration conf = notebook.getConf();
@@ -690,7 +708,7 @@ public class NotebookServer extends WebSocketServlet
}
public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
- Paragraph defaultParagraph) {
+ Paragraph defaultParagraph) {
if (null != userParagraphMap) {
for (String user : userParagraphMap.keySet()) {
multicastToUser(user,
@@ -706,7 +724,7 @@ public class NotebookServer extends WebSocketServlet
new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
}
- public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
+ public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
if (subject == null) {
subject = new AuthenticationInfo(StringUtils.EMPTY);
}
@@ -717,10 +735,16 @@ public class NotebookServer extends WebSocketServlet
broadcastNoteListExcept(notesInfo, subject);
}
- public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject,
- HashSet<String> userAndRoles) {
- List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles);
- unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+ public void listNotes(NotebookSocket conn, Message message) throws IOException {
+ getNotebookService().listNotes(false, getServiceContext(message),
+ new WebSocketServiceCallback<List<Map<String, String>>>(conn) {
+ @Override
+ public void onSuccess(List<Map<String, String>> notesInfo,
+ ServiceContext context) throws IOException {
+ super.onSuccess(notesInfo, context);
+ unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+ }
+ });
}
public void broadcastReloadedNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@@ -736,7 +760,7 @@ public class NotebookServer extends WebSocketServlet
}
private void broadcastNoteListExcept(List<Map<String, String>> notesInfo,
- AuthenticationInfo subject) {
+ AuthenticationInfo subject) {
Set<String> userAndRoles;
NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
for (String user : userConnectedSockets.keySet()) {
@@ -752,7 +776,7 @@ public class NotebookServer extends WebSocketServlet
}
void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles,
- Set<String> allowed) throws IOException {
+ Set<String> allowed) throws IOException {
LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed);
conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info",
@@ -761,32 +785,18 @@ public class NotebookServer extends WebSocketServlet
.toString())));
}
- /**
- * @return false if user doesn't have reader permission for this paragraph
- */
- private boolean hasParagraphReaderPermission(NotebookSocket conn, Notebook notebook,
- String noteId, HashSet<String> userAndRoles, String principal, String op)
- throws IOException {
- NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
- if (!notebookAuthorization.isReader(noteId, userAndRoles)) {
- permissionError(conn, op, principal, userAndRoles,
- notebookAuthorization.getOwners(noteId));
- return false;
- }
-
- return true;
- }
/**
* @return false if user doesn't have runner permission for this paragraph
*/
private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook,
- String noteId, HashSet<String> userAndRoles, String principal, String op)
- throws IOException {
+ String noteId, HashSet<String> userAndRoles,
+ String principal, String op)
+ throws IOException {
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
if (!notebookAuthorization.isRunner(noteId, userAndRoles)) {
permissionError(conn, op, principal, userAndRoles,
- notebookAuthorization.getOwners(noteId));
+ notebookAuthorization.getOwners(noteId));
return false;
}
@@ -797,8 +807,9 @@ public class NotebookServer extends WebSocketServlet
* @return false if user doesn't have writer permission for this paragraph
*/
private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook,
- String noteId, HashSet<String> userAndRoles, String principal, String op)
- throws IOException {
+ String noteId, HashSet<String> userAndRoles,
+ String principal, String op)
+ throws IOException {
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
if (!notebookAuthorization.isWriter(noteId, userAndRoles)) {
permissionError(conn, op, principal, userAndRoles,
@@ -813,7 +824,8 @@ public class NotebookServer extends WebSocketServlet
* @return false if user doesn't have owner permission for this paragraph
*/
private boolean hasParagraphOwnerPermission(NotebookSocket conn, Notebook notebook, String noteId,
- HashSet<String> userAndRoles, String principal, String op) throws IOException {
+ HashSet<String> userAndRoles, String principal,
+ String op) throws IOException {
NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization();
if (!notebookAuthorization.isOwner(noteId, userAndRoles)) {
permissionError(conn, op, principal, userAndRoles,
@@ -824,66 +836,45 @@ public class NotebookServer extends WebSocketServlet
return true;
}
- private void sendNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
- LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(),
- conn.getRequest().getRemotePort(), fromMessage.principal, fromMessage.op,
- fromMessage.get("id"));
-
+ private void getNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
if (noteId == null) {
return;
}
-
- String user = fromMessage.principal;
-
- Note note = notebook.getNote(noteId);
-
- if (note != null) {
- if (!hasParagraphReaderPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "read")) {
- return;
- }
-
- addConnectionToNote(note.getId(), conn);
-
- if (note.isPersonalizedMode()) {
- note = note.getUserNote(user);
- }
- conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
- sendAllAngularObjects(note, user, conn);
- } else {
- conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
- }
+ getNotebookService().getNote(noteId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ addConnectionToNote(note.getId(), conn);
+ conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+ sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+ }
+ });
}
- private void sendHomeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
- String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN);
- String user = fromMessage.principal;
-
- Note note = null;
- if (noteId != null) {
- note = notebook.getNote(noteId);
- }
-
- if (note != null) {
- if (!hasParagraphReaderPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "read")) {
- return;
- }
+ private void getHomeNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
- addConnectionToNote(note.getId(), conn);
- conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
- sendAllAngularObjects(note, user, conn);
- } else {
- removeConnectionFromAllNote(conn);
- conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
- }
+ getNotebookService().getHomeNote(getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ if (note != null) {
+ addConnectionToNote(note.getId(), conn);
+ conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+ sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
+ } else {
+ removeConnectionFromAllNote(conn);
+ conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
+ }
+ }
+ });
}
- private void updateNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void updateNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
String name = (String) fromMessage.get("name");
Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
@@ -894,35 +885,20 @@ public class NotebookServer extends WebSocketServlet
return;
}
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "update")) {
- return;
- }
-
- Note note = notebook.getNote(noteId);
- if (note != null) {
- if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) {
- if (config.get("cron") != null) {
- config.remove("cron");
- }
- }
- boolean cronUpdated = isCronUpdated(config, note.getConfig());
- note.setName(name);
- note.setConfig(config);
- if (cronUpdated) {
- notebook.refreshCron(note.getId());
- }
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- note.persist(subject);
- broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name).put("config", config)
- .put("info", note.getInfo()));
- broadcastNoteList(subject, userAndRoles);
- }
+ getNotebookService().updateNote(noteId, name, config, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+ .put("config", config)
+ .put("info", note.getInfo()));
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
}
private void updatePersonalizedMode(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ Notebook notebook, Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
String personalized = (String) fromMessage.get("personalized");
boolean isPersonalized = personalized.equals("true") ? true : false;
@@ -945,44 +921,31 @@ public class NotebookServer extends WebSocketServlet
}
}
- private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
- renameNote(conn, userAndRoles, notebook, fromMessage, "rename");
- }
-
- private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage, String op) throws IOException {
+ private void renameNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
String name = (String) fromMessage.get("name");
-
if (noteId == null) {
return;
}
-
- if (!hasParagraphOwnerPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "rename")) {
- return;
- }
-
- Note note = notebook.getNote(noteId);
- if (note != null) {
- note.setName(name);
- note.setCronSupported(notebook.getConf());
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- note.persist(subject);
- broadcastNote(note);
- broadcastNoteList(subject, userAndRoles);
- }
+ getNotebookService().renameNote(noteId, name, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ broadcastNote(note);
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
}
private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ Message fromMessage) throws IOException {
renameFolder(conn, userAndRoles, notebook, fromMessage, "rename");
}
private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage, String op) throws IOException {
+ Message fromMessage, String op) throws IOException {
String oldFolderId = (String) fromMessage.get("id");
String newFolderId = (String) fromMessage.get("name");
@@ -1013,76 +976,50 @@ public class NotebookServer extends WebSocketServlet
}
}
- private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
- boolean cronUpdated = false;
- if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron")
- .equals(configB.get("cron"))) {
- cronUpdated = true;
- } else if (configA.get("cron") == null && configB.get("cron") == null) {
- cronUpdated = false;
- } else if (configA.get("cron") != null || configB.get("cron") != null) {
- cronUpdated = true;
- }
-
- return cronUpdated;
- }
-
- private void createNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message message) throws IOException {
- AuthenticationInfo subject = new AuthenticationInfo(message.principal);
+ private void createNote(NotebookSocket conn,
+ Message message) throws IOException {
- try {
- Note note = null;
- String defaultInterpreterSettingId = (String) message.get("defaultInterpreterGroup");
- if (defaultInterpreterSettingId != null) {
- note = notebook.createNote(defaultInterpreterSettingId, subject);
- } else {
- note = notebook.createNote(subject);
- }
-
- note.addNewParagraph(subject); // it's an empty note. so add one paragraph
- if (message != null) {
- String noteName = (String) message.get("name");
- if (StringUtils.isEmpty(noteName)) {
- noteName = "Note " + note.getId();
- }
- note.setName(noteName);
- note.setCronSupported(notebook.getConf());
- }
+ String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup");
+ String noteName = (String) message.get("name");
+ getNotebookService().createNote(defaultInterpreterGroup, noteName, getServiceContext(message),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ addConnectionToNote(note.getId(), conn);
+ conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
- note.persist(subject);
- addConnectionToNote(note.getId(), conn);
- conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
- } catch (IOException e) {
- LOG.error("Exception from createNote", e);
- conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
- "Oops! There is something wrong with the notebook file system. "
- + "Please check the logs for more details.")));
- return;
- }
- broadcastNoteList(subject, userAndRoles);
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ super.onFailure(ex, context);
+ conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+ "Oops! There is something wrong with the notebook file system. "
+ + "Please check the logs for more details.")));
+ }
+ });
}
- private void removeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void deleteNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
if (noteId == null) {
return;
}
-
- if (!hasParagraphOwnerPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "remove")) {
- return;
- }
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- notebook.removeNote(noteId, subject);
- removeNote(noteId);
- broadcastNoteList(subject, userAndRoles);
+ getNotebookService().removeNote(noteId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<String>(conn) {
+ @Override
+ public void onSuccess(String message, ServiceContext context) throws IOException {
+ super.onSuccess(message, context);
+ removeConn(noteId);
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
}
private void removeFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ Message fromMessage) throws IOException {
String folderId = (String) fromMessage.get("id");
if (folderId == null) {
return;
@@ -1101,13 +1038,13 @@ public class NotebookServer extends WebSocketServlet
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
for (Note note : notes) {
notebook.removeNote(note.getId(), subject);
- removeNote(note.getId());
+ removeConn(note.getId());
}
broadcastNoteList(subject, userAndRoles);
}
private void moveNoteToTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws SchedulerException, IOException {
+ Message fromMessage) throws SchedulerException, IOException {
String noteId = (String) fromMessage.get("id");
if (noteId == null) {
return;
@@ -1121,15 +1058,16 @@ public class NotebookServer extends WebSocketServlet
notebook.removeCron(note.getId());
}
- if (note != null && !note.isTrash()){
+ if (note != null && !note.isTrash()) {
fromMessage.put("name", Folder.TRASH_FOLDER_ID + "/" + note.getName());
- renameNote(conn, userAndRoles, notebook, fromMessage, "move");
+ renameNote(conn, fromMessage);
notebook.moveNoteToTrash(note.getId());
}
}
private void moveFolderToTrash(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws SchedulerException, IOException {
+ Notebook notebook, Message fromMessage)
+ throws SchedulerException, IOException {
String folderId = (String) fromMessage.get("id");
if (folderId == null) {
return;
@@ -1138,14 +1076,14 @@ public class NotebookServer extends WebSocketServlet
Folder folder = notebook.getFolder(folderId);
if (folder != null && !folder.isTrash()) {
String trashFolderId = Folder.TRASH_FOLDER_ID + "/" + folderId;
- if (notebook.hasFolder(trashFolderId)){
+ if (notebook.hasFolder(trashFolderId)) {
DateTime currentDate = new DateTime();
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
trashFolderId += Folder.TRASH_FOLDER_CONFLICT_INFIX + formatter.print(currentDate);
}
List<Note> noteList = folder.getNotesRecursively();
- for (Note note: noteList) {
+ for (Note note : noteList) {
Map<String, Object> config = note.getConfig();
if (config.get("cron") != null) {
notebook.removeCron(note.getId());
@@ -1157,30 +1095,20 @@ public class NotebookServer extends WebSocketServlet
}
}
- private void restoreNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws SchedulerException, IOException {
+ private void restoreNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
-
if (noteId == null) {
return;
}
- Note note = notebook.getNote(noteId);
-
- //restore cron
- Map<String, Object> config = note.getConfig();
- if (config.get("cron") != null) {
- notebook.refreshCron(note.getId());
- }
+ getNotebookService().restoreNote(noteId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn));
- if (note != null && note.isTrash()) {
- fromMessage.put("name", note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", ""));
- renameNote(conn, userAndRoles, notebook, fromMessage, "restore");
- }
}
private void restoreFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws SchedulerException, IOException {
+ Message fromMessage) throws SchedulerException, IOException {
String folderId = (String) fromMessage.get("id");
if (folderId == null) {
@@ -1211,7 +1139,7 @@ public class NotebookServer extends WebSocketServlet
}
private void restoreAll(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws SchedulerException, IOException {
+ Message fromMessage) throws IOException {
Folder trashFolder = notebook.getFolder(Folder.TRASH_FOLDER_ID);
if (trashFolder != null) {
fromMessage.data = new HashMap<>();
@@ -1222,57 +1150,42 @@ public class NotebookServer extends WebSocketServlet
}
private void emptyTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws SchedulerException, IOException {
+ Message fromMessage) throws SchedulerException, IOException {
fromMessage.data = new HashMap<>();
fromMessage.put("id", Folder.TRASH_FOLDER_ID);
removeFolder(conn, userAndRoles, notebook, fromMessage);
}
- private void updateParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ private void updateParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
-
- Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
String noteId = getOpenNoteId(conn);
if (noteId == null) {
noteId = (String) fromMessage.get("noteId");
}
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- final Note note = notebook.getNote(noteId);
- Paragraph p = note.getParagraph(paragraphId);
-
- p.settings.setParams(params);
- p.setConfig(config);
- p.setTitle((String) fromMessage.get("title"));
- p.setText((String) fromMessage.get("paragraph"));
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- if (note.isPersonalizedMode()) {
- p = p.getUserParagraph(subject.getUser());
- p.settings.setParams(params);
- p.setConfig(config);
- p.setTitle((String) fromMessage.get("title"));
- p.setText((String) fromMessage.get("paragraph"));
- }
-
- note.persist(subject);
+ String title = (String) fromMessage.get("title");
+ String text = (String) fromMessage.get("paragraph");
+ Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+ Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
- if (note.isPersonalizedMode()) {
- Map<String, Paragraph> userParagraphMap =
- note.getParagraph(paragraphId).getUserParagraphMap();
- broadcastParagraphs(userParagraphMap, p);
- } else {
- broadcastParagraph(note, p);
- }
+ getNotebookService().updateParagraph(noteId, paragraphId, title, text, params, config,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn) {
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ if (p.getNote().isPersonalizedMode()) {
+ Map<String, Paragraph> userParagraphMap =
+ p.getNote().getParagraph(paragraphId).getUserParagraphMap();
+ broadcastParagraphs(userParagraphMap, p);
+ } else {
+ broadcastParagraph(p.getNote(), p);
+ }
+ }
+ });
}
private void patchParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
@@ -1327,142 +1240,140 @@ public class NotebookServer extends WebSocketServlet
paragraphText = (String) dmp.patchApply(patches, paragraphText)[0];
p.setText(paragraphText);
Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText)
- .put("paragraphId", p.getId());
+ .put("paragraphId", p.getId());
broadcastExcept(note.getId(), message, conn);
}
- private void cloneNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void cloneNote(NotebookSocket conn,
+ Message fromMessage) throws IOException {
+
String noteId = getOpenNoteId(conn);
String name = (String) fromMessage.get("name");
- Note newNote = notebook.cloneNote(noteId, name, new AuthenticationInfo(fromMessage.principal));
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- addConnectionToNote(newNote.getId(), (NotebookSocket) conn);
- conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
- broadcastNoteList(subject, userAndRoles);
+ getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+ super.onSuccess(newNote, context);
+ addConnectionToNote(newNote.getId(), conn);
+ conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
}
- private void clearAllParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ private void clearAllParagraphOutput(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String noteId = (String) fromMessage.get("id");
if (StringUtils.isBlank(noteId)) {
return;
}
+ getNotebookService().clearAllParagraphOutput(noteId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ broadcastNote(note);
+ }
+ });
+ }
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "clear output")) {
- return;
- }
+ protected Note importNote(NotebookSocket conn, Message fromMessage) throws IOException {
+ String noteName = (String) ((Map) fromMessage.get("note")).get("name");
+ String noteJson = gson.toJson(fromMessage.get("note"));
+ Note note = getNotebookService().importNote(noteName, noteJson, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ try {
+ broadcastNote(note);
+ broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ } catch (NullPointerException e) {
+ // TODO(zjffdu) remove this try catch. This is only for test of
+ // NotebookServerTest#testImportNotebook
+ }
+ }
+ });
- Note note = notebook.getNote(noteId);
- note.clearAllParagraphOutput();
- broadcastNote(note);
- }
-
- protected Note importNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
- Note note = null;
- if (fromMessage != null) {
- String noteName = (String) ((Map) fromMessage.get("note")).get("name");
- String noteJson = gson.toJson(fromMessage.get("note"));
- AuthenticationInfo subject;
- if (fromMessage.principal != null) {
- subject = new AuthenticationInfo(fromMessage.principal);
- } else {
- subject = new AuthenticationInfo("anonymous");
- }
- note = notebook.importNote(noteJson, noteName, subject);
- note.persist(subject);
- broadcastNote(note);
- broadcastNoteList(subject, userAndRoles);
- }
return note;
}
- private void removeParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void removeParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
String noteId = getOpenNoteId(conn);
-
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- final Note note = notebook.getNote(noteId);
-
- // Don't allow removing paragraph when there is only one paragraph in the Notebook
- if (note.getParagraphCount() > 1) {
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- Paragraph para = note.removeParagraph(subject.getUser(), paragraphId);
- note.persist(subject);
- if (para != null) {
- broadcast(note.getId(), new Message(OP.PARAGRAPH_REMOVED).
- put("id", para.getId()));
- }
- }
+ getNotebookService().removeParagraph(noteId, paragraphId,
+ getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ super.onSuccess(p, context);
+ broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+ put("id", p.getId()));
+ }
+ });
}
- private void clearParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ private void clearParagraphOutput(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
-
String noteId = getOpenNoteId(conn);
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- final Note note = notebook.getNote(noteId);
- if (note.isPersonalizedMode()) {
- String user = fromMessage.principal;
- Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
- unicastParagraph(note, p, user);
- } else {
- note.clearParagraphOutput(paragraphId);
- Paragraph paragraph = note.getParagraph(paragraphId);
- broadcastParagraph(note, paragraph);
- }
+ getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn) {
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ super.onSuccess(p, context);
+ if (p.getNote().isPersonalizedMode()) {
+ unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+ } else {
+ broadcastParagraph(p.getNote(), p);
+ }
+ }
+ });
}
- private void completion(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void completion(NotebookSocket conn,
+ Message fromMessage) throws IOException {
+ String noteId = getOpenNoteId(conn);
String paragraphId = (String) fromMessage.get("id");
String buffer = (String) fromMessage.get("buf");
int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
- Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
- if (paragraphId == null) {
- conn.send(serializeMessage(resp));
- return;
- }
+ getNotebookService().completion(noteId, paragraphId, buffer, cursor,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) {
+ @Override
+ public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context)
+ throws IOException {
+ super.onSuccess(completions, context);
+ Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+ resp.put("completions", completions);
+ conn.send(serializeMessage(resp));
+ }
- final Note note = notebook.getNote(getOpenNoteId(conn));
- List<InterpreterCompletion> candidates;
- try {
- candidates = note.completion(paragraphId, buffer, cursor);
- } catch (RuntimeException e) {
- LOG.info("Fail to get completion", e);
- candidates = new ArrayList<>();
- }
- resp.put("completions", candidates);
- conn.send(serializeMessage(resp));
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ super.onFailure(ex, context);
+ Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+ resp.put("completions", new ArrayList<>());
+ conn.send(serializeMessage(resp));
+ }
+ });
}
/**
* When angular object updated from client.
*
- * @param conn the web socket.
- * @param notebook the notebook.
+ * @param conn the web socket.
+ * @param notebook the notebook.
* @param fromMessage the message.
*/
private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) {
+ Notebook notebook, Message fromMessage) {
String noteId = (String) fromMessage.get("noteId");
String paragraphId = (String) fromMessage.get("paragraphId");
String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
@@ -1547,7 +1458,7 @@ public class NotebookServer extends WebSocketServlet
* and a paragraph id.
*/
protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws Exception {
+ Notebook notebook, Message fromMessage) throws Exception {
String noteId = fromMessage.getType("noteId");
String varName = fromMessage.getType("name");
Object varValue = fromMessage.get("value");
@@ -1581,7 +1492,8 @@ public class NotebookServer extends WebSocketServlet
* and an optional list of paragraph id(s).
*/
protected void angularObjectClientUnbind(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws Exception {
+ Notebook notebook, Message fromMessage)
+ throws Exception {
String noteId = fromMessage.getType("noteId");
String varName = fromMessage.getType("name");
String paragraphId = fromMessage.getType("paragraphId");
@@ -1609,7 +1521,7 @@ public class NotebookServer extends WebSocketServlet
}
private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId)
- throws Exception {
+ throws Exception {
final Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId);
@@ -1618,8 +1530,10 @@ public class NotebookServer extends WebSocketServlet
}
private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName,
- Object varValue, RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
- NotebookSocket conn) {
+ Object varValue,
+ RemoteAngularObjectRegistry remoteRegistry,
+ String interpreterGroupId,
+ NotebookSocket conn) {
final AngularObject ao =
remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
@@ -1629,8 +1543,9 @@ public class NotebookServer extends WebSocketServlet
}
private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName,
- RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId,
- NotebookSocket conn) {
+ RemoteAngularObjectRegistry remoteRegistry,
+ String interpreterGroupId,
+ NotebookSocket conn) {
final AngularObject ao =
remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao)
@@ -1639,8 +1554,9 @@ public class NotebookServer extends WebSocketServlet
}
private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName,
- Object varValue, AngularObjectRegistry registry, String interpreterGroupId,
- NotebookSocket conn) {
+ Object varValue, AngularObjectRegistry registry,
+ String interpreterGroupId,
+ NotebookSocket conn) {
AngularObject angularObject = registry.get(varName, noteId, paragraphId);
if (angularObject == null) {
angularObject = registry.add(varName, varValue, noteId, paragraphId);
@@ -1655,7 +1571,8 @@ public class NotebookServer extends WebSocketServlet
}
private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName,
- AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) {
+ AngularObjectRegistry registry,
+ String interpreterGroupId, NotebookSocket conn) {
final AngularObject removed = registry.remove(varName, noteId, paragraphId);
if (removed != null) {
this.broadcastExcept(noteId,
@@ -1665,8 +1582,8 @@ public class NotebookServer extends WebSocketServlet
}
}
- private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void moveParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
@@ -1674,31 +1591,22 @@ public class NotebookServer extends WebSocketServlet
final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
String noteId = getOpenNoteId(conn);
- final Note note = notebook.getNote(noteId);
-
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- note.moveParagraph(paragraphId, newIndex);
- note.persist(subject);
- broadcast(note.getId(),
- new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+ getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn) {
+ @Override
+ public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+ super.onSuccess(result, context);
+ broadcast(result.getNote().getId(),
+ new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
+ }
+ });
}
- private String insertParagraph(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ private String insertParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
String noteId = getOpenNoteId(conn);
- final Note note = notebook.getNote(noteId);
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return null;
- }
Map<String, Object> config;
if (fromMessage.get("config") != null) {
config = (Map<String, Object>) fromMessage.get("config");
@@ -1706,85 +1614,59 @@ public class NotebookServer extends WebSocketServlet
config = new HashMap<>();
}
- Paragraph newPara = note.insertNewParagraph(index, subject);
- newPara.setConfig(config);
- note.persist(subject);
- broadcastNewParagraph(note, newPara);
+ Paragraph newPara = getNotebookService().insertParagraph(noteId, index, config,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn) {
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ super.onSuccess(p, context);
+ broadcastNewParagraph(p.getNote(), p);
+ }
+ });
return newPara.getId();
}
- private void copyParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
- String newParaId = insertParagraph(conn, userAndRoles, notebook, fromMessage);
+ private void copyParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
+ String newParaId = insertParagraph(conn, fromMessage);
if (newParaId == null) {
return;
}
fromMessage.put("id", newParaId);
- updateParagraph(conn, userAndRoles, notebook, fromMessage);
+ updateParagraph(conn, fromMessage);
}
- private void cancelParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
String noteId = getOpenNoteId(conn);
-
- if (!hasParagraphRunnerPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- final Note note = notebook.getNote(noteId);
- Paragraph p = note.getParagraph(paragraphId);
- p.abort();
+ getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<>(conn));
}
- private void runAllParagraphs(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ private void runAllParagraphs(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String noteId = (String) fromMessage.get("noteId");
if (StringUtils.isBlank(noteId)) {
return;
}
-
- if (!hasParagraphRunnerPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "run all paragraphs")) {
- return;
- }
-
List<Map<String, Object>> paragraphs =
gson.fromJson(String.valueOf(fromMessage.data.get("paragraphs")),
- new TypeToken<List<Map<String, Object>>>() {}.getType());
-
- for (Map<String, Object> raw : paragraphs) {
- String paragraphId = (String) raw.get("id");
- if (paragraphId == null) {
- continue;
- }
-
- String text = (String) raw.get("paragraph");
- String title = (String) raw.get("title");
- Map<String, Object> params = (Map<String, Object>) raw.get("params");
- Map<String, Object> config = (Map<String, Object>) raw.get("config");
-
- Note note = notebook.getNote(noteId);
- Paragraph p = setParagraphUsingMessage(note, fromMessage,
- paragraphId, text, title, params, config);
+ new TypeToken<List<Map<String, Object>>>() {
+ }.getType());
- if (p.isEnabled() && !persistAndExecuteSingleParagraph(conn, note, p, true)) {
- // stop execution when one paragraph fails.
- break;
- }
- }
+ getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn));
}
private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook, Message fromMessage) throws IOException {
+ Notebook notebook, Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
@@ -1837,39 +1719,40 @@ public class NotebookServer extends WebSocketServlet
new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
}
- private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void runParagraph(NotebookSocket conn,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
+ //TODO(zjffdu) it is possible ?
return;
}
String noteId = getOpenNoteId(conn);
-
- if (!hasParagraphRunnerPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "write")) {
- return;
- }
-
- // 1. clear paragraph only if personalized,
- // otherwise this will be handed in `onOutputClear`
- final Note note = notebook.getNote(noteId);
- if (note.isPersonalizedMode()) {
- String user = fromMessage.principal;
- Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user);
- unicastParagraph(note, p, user);
- }
-
- // 2. set paragraph values
String text = (String) fromMessage.get("paragraph");
String title = (String) fromMessage.get("title");
Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, false,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Paragraph>(conn) {
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ if (p.getNote().isPersonalizedMode()) {
+ Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
+ context.getAutheInfo().getUser());
+ unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+ }
- Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId,
- text, title, params, config);
-
- persistAndExecuteSingleParagraph(conn, note, p, false);
+ // if it's the last paragraph and not empty, let's add a new one
+ boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId);
+ if (!(Strings.isNullOrEmpty(p.getText()) ||
+ Strings.isNullOrEmpty(p.getScriptText())) &&
+ isTheLastParagraph) {
+ Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo());
+ broadcastNewParagraph(p.getNote(), newPara);
+ }
+ }
+ });
}
private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) {
@@ -1887,7 +1770,7 @@ public class NotebookServer extends WebSocketServlet
* @return false if failed to save a note
*/
private boolean persistNoteWithAuthInfo(NotebookSocket conn, Note note, Paragraph p)
- throws IOException {
+ throws IOException {
try {
note.persist(p.getAuthenticationInfo());
return true;
@@ -1901,28 +1784,9 @@ public class NotebookServer extends WebSocketServlet
}
}
- private boolean persistAndExecuteSingleParagraph(NotebookSocket conn, Note note, Paragraph p,
- boolean blocking) throws IOException {
- addNewParagraphIfLastParagraphIsExecuted(note, p);
- if (!persistNoteWithAuthInfo(conn, note, p)) {
- return false;
- }
-
- try {
- return note.run(p.getId(), blocking);
- } catch (Exception ex) {
- LOG.error("Exception from run", ex);
- if (p != null) {
- p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex);
- p.setStatus(Status.ERROR);
- broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p));
- }
- return false;
- }
- }
-
private Paragraph setParagraphUsingMessage(Note note, Message fromMessage, String paragraphId,
- String text, String title, Map<String, Object> params, Map<String, Object> config) {
+ String text, String title, Map<String, Object> params,
+ Map<String, Object> config) {
Paragraph p = note.getParagraph(paragraphId);
p.setText(text);
p.setTitle(title);
@@ -1945,7 +1809,7 @@ public class NotebookServer extends WebSocketServlet
}
private void sendAllConfigurations(NotebookSocket conn, HashSet<String> userAndRoles,
- Notebook notebook) throws IOException {
+ Notebook notebook) throws IOException {
ZeppelinConfiguration conf = notebook.getConf();
Map<String, String> configurations =
@@ -1962,97 +1826,94 @@ public class NotebookServer extends WebSocketServlet
new Message(OP.CONFIGURATIONS_INFO).put("configurations", configurations)));
}
- private void checkpointNote(NotebookSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void checkpointNote(NotebookSocket conn, Message fromMessage)
+ throws IOException {
String noteId = (String) fromMessage.get("noteId");
String commitMessage = (String) fromMessage.get("commitMessage");
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- Revision revision = notebook.checkpointNote(noteId, commitMessage, subject);
- if (!Revision.isEmpty(revision)) {
- List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
- conn.send(
- serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
- } else {
- conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
- "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
- + "Please check the logs for more details.")));
- }
+
+ getNotebookService().checkpointNote(noteId, commitMessage, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Revision>(conn) {
+ @Override
+ public void onSuccess(Revision revision, ServiceContext context) throws IOException {
+ super.onSuccess(revision, context);
+ if (!Revision.isEmpty(revision)) {
+ List<Revision> revisions =
+ notebook().listRevisionHistory(noteId, context.getAutheInfo());
+ conn.send(
+ serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList",
+ revisions)));
+ } else {
+ conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
+ "Couldn't checkpoint note revision: possibly storage doesn't support versioning. "
+ + "Please check the logs for more details.")));
+ }
+ }
+ });
}
- private void listRevisionHistory(NotebookSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void listRevisionHistory(NotebookSocket conn, Message fromMessage)
+ throws IOException {
String noteId = (String) fromMessage.get("noteId");
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- List<Revision> revisions = notebook.listRevisionHistory(noteId, subject);
-
- conn.send(
- serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+ getNotebookService().listRevisionHistory(noteId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<List<Revision>>(conn) {
+ @Override
+ public void onSuccess(List<Revision> revisions, ServiceContext context)
+ throws IOException {
+ super.onSuccess(revisions, context);
+ conn.send(serializeMessage(
+ new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions)));
+ }
+ });
}
- private void setNoteRevision(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void setNoteRevision(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("noteId");
String revisionId = (String) fromMessage.get("revisionId");
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "update")) {
- return;
- }
-
- Note headNote = null;
- boolean setRevisionStatus;
- try {
- headNote = notebook.setNoteRevision(noteId, revisionId, subject);
- setRevisionStatus = headNote != null;
- } catch (Exception e) {
- setRevisionStatus = false;
- LOG.error("Failed to set given note revision", e);
- }
- if (setRevisionStatus) {
- notebook.loadNoteFromRepo(noteId, subject);
- }
-
- conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", setRevisionStatus)));
-
- if (setRevisionStatus) {
- Note reloadedNote = notebook.getNote(headNote.getId());
- broadcastNote(reloadedNote);
- } else {
- conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info",
- "Couldn't set note to the given revision. "
- + "Please check the logs for more details.")));
- }
+ getNotebookService().setNoteRevision(noteId, revisionId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo());
+ conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true)));
+ broadcastNote(reloadedNote);
+ }
+ });
}
- private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message fromMessage)
+ private void getNoteByRevision(NotebookSocket conn, Message fromMessage)
throws IOException {
String noteId = (String) fromMessage.get("noteId");
String revisionId = (String) fromMessage.get("revisionId");
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
- conn.send(serializeMessage(
- new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
- .put("note", revisionNote)));
+ getNotebookService().getNotebyRevision(noteId, revisionId, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ conn.send(serializeMessage(
+ new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId)
+ .put("note", note)));
+ }
+ });
}
- private void getNoteByRevisionForCompare(NotebookSocket conn, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void getNoteByRevisionForCompare(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("noteId");
String revisionId = (String) fromMessage.get("revisionId");
-
String position = (String) fromMessage.get("position");
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- Note revisionNote;
- if (revisionId.equals("Head")) {
- revisionNote = notebook.getNote(noteId);
- } else {
- revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject);
- }
-
- conn.send(serializeMessage(
- new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
- .put("revisionId", revisionId).put("position", position).put("note", revisionNote)));
+ getNotebookService().getNoteByRevisionForCompare(noteId, revisionId,
+ getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ super.onSuccess(note, context);
+ conn.send(serializeMessage(
+ new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId)
+ .put("revisionId", revisionId).put("position", position).put("note", note)));
+ }
+ });
}
/**
@@ -2074,7 +1935,7 @@ public class NotebookServer extends WebSocketServlet
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, int index,
- InterpreterResult.Type type, String output) {
+ InterpreterResult.Type type, String output) {
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId)
.put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output);
Note note = notebook().getNote(noteId);
@@ -2105,7 +1966,7 @@ public class NotebookServer extends WebSocketServlet
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, int index, String appId,
- String output) {
+ String output) {
Message msg =
new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("appId", appId).put("data", output);
@@ -2117,7 +1978,7 @@ public class NotebookServer extends WebSocketServlet
*/
@Override
public void onOutputUpdated(String noteId, String paragraphId, int index, String appId,
- InterpreterResult.Type type, String output) {
+ InterpreterResult.Type type, String output) {
Message msg =
new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("type", type).put("appId", appId).put("data", output);
@@ -2322,7 +2183,7 @@ public class NotebookServer extends WebSocketServlet
if (job.getStatus() == Status.FINISHED) {
LOG.info("Job {} is finished successfully, status: {}", job.getId(), job.getStatus());
} else {
- LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}" , job.getId(),
+ LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", job.getId(),
job.getStatus(), job.getException(), job.getReturn());
}
@@ -2484,7 +2345,7 @@ public class NotebookServer extends WebSocketServlet
}
}
- private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
+ private void getInterpreterSettings(NotebookSocket conn)
throws IOException {
List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get();
conn.send(serializeMessage(
@@ -2525,7 +2386,7 @@ public class NotebookServer extends WebSocketServlet
private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) {
synchronized (connectedSockets) {
- for (NotebookSocket conn: connectedSockets) {
+ for (NotebookSocket conn : connectedSockets) {
if (exclude != null && exclude.equals(conn)) {
continue;
}
@@ -2555,7 +2416,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onParaInfosReceived(String noteId, String paragraphId,
- String interpreterSettingId, Map<String, String> metaInfos) {
+ String interpreterSettingId, Map<String, String> metaInfos) {
Note note = notebook().getNote(noteId);
if (note != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
@@ -2601,53 +2462,39 @@ public class NotebookServer extends WebSocketServlet
setting.clearNoteIdAndParaMap();
}
- public void broadcastNoteForms(Note note) {
+ private void broadcastNoteForms(Note note) {
GUI formsSettings = new GUI();
formsSettings.setForms(note.getNoteForms());
formsSettings.setParams(note.getNoteParams());
-
broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
}
- private void saveNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void saveNoteForms(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("noteId");
Map<String, Object> noteParams = (Map<String, Object>) fromMessage.get("noteParams");
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "update")) {
- return;
- }
-
- Note note = notebook.getNote(noteId);
- if (note != null) {
- note.setNoteParams(noteParams);
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- note.persist(subject);
- broadcastNoteForms(note);
- }
+ getNotebookService().saveNoteForms(noteId, noteParams, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ broadcastNoteForms(note);
+ }
+ });
}
- private void removeNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook,
- Message fromMessage) throws IOException {
+ private void removeNoteForms(NotebookSocket conn,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("noteId");
String formName = (String) fromMessage.get("formName");
- if (!hasParagraphWriterPermission(conn, notebook, noteId,
- userAndRoles, fromMessage.principal, "update")) {
- return;
- }
-
- Note note = notebook.getNote(noteId);
- if (note != null) {
- note.getNoteForms().remove(formName);
- note.getNoteParams().remove(formName);
-
- AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
- note.persist(subject);
- broadcastNoteForms(note);
- }
+ getNotebookService().removeNoteForms(noteId, formName, getServiceContext(fromMessage),
+ new WebSocketServiceCallback<Note>(conn) {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ broadcastNoteForms(note);
+ }
+ });
}
@Override
@@ -2665,4 +2512,43 @@ public class NotebookServer extends WebSocketServlet
m.data.put("notice", message);
broadcast(m);
}
+
+ private ServiceContext getServiceContext(Message message) {
+ AuthenticationInfo authInfo =
+ new AuthenticationInfo(message.principal, message.roles, message.ticket);
+ Set<String> userAndRoles = new HashSet<>();
+ userAndRoles.add(message.principal);
+ if (message.roles != null && !message.roles.equals("")) {
+ HashSet<String> roles =
+ gson.fromJson(message.roles, new TypeToken<HashSet<String>>() {
+ }.getType());
+ if (roles != null) {
+ userAndRoles.addAll(roles);
+ }
+ }
+ return new ServiceContext(authInfo, userAndRoles);
+ }
+
+ private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> {
+
+ private NotebookSocket conn;
+
+ WebSocketServiceCallback(NotebookSocket conn) {
+ this.conn = conn;
+ }
+
+ @Override
+ public void onFailure(Exception ex, ServiceContext context) throws IOException {
+ super.onFailure(ex, context);
+ if (ex instanceof ForbiddenException) {
+ Type type = new TypeToken<Map<String, String>>(){}.getType();
+ Map<String, String> jsonObject =
+ gson.fromJson(((ForbiddenException) ex).getResponse().getEntity().toString(), type);
+ conn.send(serializeMessage(new Message(OP.AUTH_INFO)
+ .put("info", jsonObject.get("message"))));
+ } else {
+ conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", ex.getMessage())));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
index 66e4038..65740ff 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -77,4 +77,9 @@ public class NotebookSocket extends WebSocketAdapter {
public void setUser(String user) {
this.user = user;
}
+
+ @Override
+ public String toString() {
+ return request.getRemoteHost() + ":" + request.getRemotePort();
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
deleted file mode 100644
index ddfe7c4..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-/** This will be used by some services to pass messages to frontend via WebSocket */
-public interface ServiceCallback {
- void onStart(String message);
-
- void onSuccess(String message);
-
- void onFailure(String message);
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index ce867ee..62a3782 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -221,8 +221,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
@Test
public void testDeleteNoteBadId() throws IOException {
LOG.info("testDeleteNoteBadId");
- testDeleteNote("2AZFXEX97");
- testDeleteNote("bad_ID");
+ testDeleteNotExistNote("bad_ID");
}
@Test
@@ -317,6 +316,13 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
}
}
+ private void testDeleteNotExistNote(String noteId) throws IOException {
+ DeleteMethod delete = httpDelete(("/notebook/" + noteId));
+ LOG.info("testDeleteNote delete response\n" + delete.getResponseBodyAsString());
+ assertThat("Test delete method:", delete, isNotFound());
+ delete.releaseConnection();
+ }
+
@Test
public void testCloneNote() throws IOException, IllegalArgumentException {
LOG.info("testCloneNote");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
index ab74012..cc4b1df 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java
@@ -33,7 +33,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.rest.message.InterpreterInstallationRequest;
-import org.apache.zeppelin.socket.ServiceCallback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -120,19 +119,19 @@ public class InterpreterServiceTest {
new InterpreterInstallationRequest(interpreterName, artifactName),
dependencyResolver,
specificInterpreterPath,
- new ServiceCallback() {
+ new SimpleServiceCallback<String>() {
@Override
- public void onStart(String message) {
+ public void onStart(String message, ServiceContext context) {
assertEquals("Starting to download " + interpreterName + " interpreter", message);
}
@Override
- public void onSuccess(String message) {
+ public void onSuccess(String message, ServiceContext context) {
assertEquals(interpreterName + " downloaded", message);
}
@Override
- public void onFailure(String message) {
+ public void onFailure(Exception ex, ServiceContext context) {
fail();
}
});