You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/03/04 03:23:57 UTC
[zeppelin] branch master updated: [ZEPPELIN-4655].
authorizationService in ConnectionManager is null
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 211268d [ZEPPELIN-4655]. authorizationService in ConnectionManager is null
211268d is described below
commit 211268d61a2eaee6a896c2b83af3b3266308940a
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sun Mar 1 14:58:50 2020 +0800
[ZEPPELIN-4655]. authorizationService in ConnectionManager is null
### What is this PR for?
A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html
### What type of PR is it?
[Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/
* Put link here, and add [ZEPPELIN-*Jira number*] in PR title, eg. [ZEPPELIN-533]
### How should this be tested?
* First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: Jeff Zhang <zj...@apache.org>
Closes #3670 from zjffdu/ZEPPELIN-4655 and squashes the following commits:
51803d742 [Jeff Zhang] [ZEPPELIN-4655]. authorizationService in ConnectionManager is null
---
.../org/apache/zeppelin/server/ZeppelinServer.java | 4 +-
.../apache/zeppelin/socket/ConnectionManager.java | 5 +-
.../org/apache/zeppelin/socket/NotebookServer.java | 148 ++---
.../apache/zeppelin/cluster/ClusterEventTest.java | 2 +-
.../apache/zeppelin/socket/NotebookServerTest.java | 615 +++++++++++----------
5 files changed, 400 insertions(+), 374 deletions(-)
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 0e5be41..0c9b1cf 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -65,6 +65,7 @@ import org.apache.zeppelin.search.LuceneSearch;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.service.*;
import org.apache.zeppelin.service.AuthenticationService;
+import org.apache.zeppelin.socket.ConnectionManager;
import org.apache.zeppelin.socket.NotebookServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
@@ -161,7 +162,8 @@ public class ZeppelinServer extends ResourceConfig {
bindAsContract(GsonProvider.class).in(Singleton.class);
bindAsContract(WebApplicationExceptionMapper.class).in(Singleton.class);
bindAsContract(AdminService.class).in(Singleton.class);
- bindAsContract(AuthorizationService.class).to(Singleton.class);
+ bindAsContract(AuthorizationService.class).in(Singleton.class);
+ bindAsContract(ConnectionManager.class).in(Singleton.class);
// TODO(jl): Will make it more beautiful
if (!StringUtils.isBlank(conf.getShiroPath())) {
bind(ShiroAuthenticationService.class).to(AuthenticationService.class).in(Singleton.class);
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index 0df4db4..489a59f 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -40,6 +40,7 @@ import org.eclipse.jetty.websocket.api.WebSocketException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -87,8 +88,8 @@ public class ConnectionManager {
private AuthorizationService authorizationService;
- public void setAuthorizationService(
- AuthorizationService authorizationService) {
+ @Inject
+ public ConnectionManager(AuthorizationService authorizationService) {
this.authorizationService = authorizationService;
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 01c463d..b177f64 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -139,8 +139,6 @@ public class NotebookServer extends WebSocketServlet
.registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
private static AtomicReference<NotebookServer> self = new AtomicReference<>();
- private ConnectionManager connectionManager;
-
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private Provider<Notebook> notebookProvider;
@@ -148,9 +146,9 @@ public class NotebookServer extends WebSocketServlet
private Provider<AuthorizationService> authorizationServiceProvider;
private Provider<ConfigurationService> configurationServiceProvider;
private Provider<JobManagerService> jobManagerServiceProvider;
+ private Provider<ConnectionManager> connectionManagerProvider;
public NotebookServer() {
- this.connectionManager = new ConnectionManager();
NotebookServer.self.set(this);
LOG.info("NotebookServer instantiated: {}", this);
}
@@ -181,6 +179,12 @@ public class NotebookServer extends WebSocketServlet
}
@Inject
+ public void setConnectionManagerProvider(Provider<ConnectionManager> connectionManagerProvider) {
+ this.connectionManagerProvider = connectionManagerProvider;
+ LOG.info("Injected ConnectionManagerProvider");
+ }
+
+ @Inject
public void setConfigurationService(
Provider<ConfigurationService> configurationServiceProvider) {
this.configurationServiceProvider = configurationServiceProvider;
@@ -200,6 +204,10 @@ public class NotebookServer extends WebSocketServlet
return notebookProvider.get();
}
+ public ConnectionManager getConnectionManager() {
+ return connectionManagerProvider.get();
+ }
+
public NotebookService getNotebookService() {
return notebookServiceProvider.get();
}
@@ -233,7 +241,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onOpen(NotebookSocket conn) {
LOG.info("New connection from {}", conn);
- connectionManager.addConnection(conn);
+ getConnectionManager().addConnection(conn);
}
@Override
@@ -284,7 +292,7 @@ public class NotebookServer extends WebSocketServlet
}
if (StringUtils.isEmpty(conn.getUser())) {
- connectionManager.addUserConnection(messagereceived.principal, conn);
+ getConnectionManager().addUserConnection(messagereceived.principal, conn);
}
// Lets be elegant here
@@ -433,7 +441,7 @@ public class NotebookServer extends WebSocketServlet
getInterpreterSettings(conn, messagereceived);
break;
case WATCHER:
- connectionManager.switchConnectionToWatcher(conn);
+ getConnectionManager().switchConnectionToWatcher(conn);
break;
case SAVE_NOTE_FORMS:
saveNoteForms(conn, messagereceived);
@@ -460,13 +468,9 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onClose(NotebookSocket conn, int code, String reason) {
LOG.info("Closed connection to {} ({}) {}", conn, code, reason);
- connectionManager.removeConnection(conn);
- connectionManager.removeConnectionFromAllNote(conn);
- connectionManager.removeUserConnection(conn.getUser(), conn);
- }
-
- public ConnectionManager getConnectionManager() {
- return connectionManager;
+ getConnectionManager().removeConnection(conn);
+ getConnectionManager().removeConnectionFromAllNote(conn);
+ getConnectionManager().removeUserConnection(conn.getUser(), conn);
}
protected Message deserializeMessage(String msg) {
@@ -478,12 +482,12 @@ public class NotebookServer extends WebSocketServlet
}
public void broadcast(Message m) {
- connectionManager.broadcast(m);
+ getConnectionManager().broadcast(m);
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
- connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+ getConnectionManager().addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage),
new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) {
@Override
@@ -513,7 +517,7 @@ public class NotebookServer extends WebSocketServlet
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notesJobInfo);
- connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ getConnectionManager().broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
}
@@ -525,7 +529,7 @@ public class NotebookServer extends WebSocketServlet
}
public void unsubscribeNoteJobInfo(NotebookSocket conn) {
- connectionManager.removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
+ getConnectionManager().removeNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn);
}
public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -578,7 +582,7 @@ public class NotebookServer extends WebSocketServlet
private void inlineBroadcastNote(Note note) {
Message message = new Message(OP.NOTE).put("note", note);
- connectionManager.broadcast(note.getId(), message);
+ getConnectionManager().broadcast(note.getId(), message);
}
private void inlineBroadcastParagraph(Note note, Paragraph p) {
@@ -588,7 +592,7 @@ public class NotebookServer extends WebSocketServlet
broadcastParagraphs(p.getUserParagraphMap(), p);
} else {
Message message = new Message(OP.PARAGRAPH).put("paragraph", p);
- connectionManager.broadcast(note.getId(), message);
+ getConnectionManager().broadcast(note.getId(), message);
}
}
@@ -602,7 +606,7 @@ public class NotebookServer extends WebSocketServlet
if (null != userParagraphMap) {
for (String user : userParagraphMap.keySet()) {
Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user));
- connectionManager.multicastToUser(user, message);
+ getConnectionManager().multicastToUser(user, message);
}
}
}
@@ -618,7 +622,7 @@ public class NotebookServer extends WebSocketServlet
int paraIndex = note.getParagraphs().indexOf(para);
Message message = new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex);
- connectionManager.broadcast(note.getId(), message);
+ getConnectionManager().broadcast(note.getId(), message);
}
private void broadcastNewParagraph(Note note, Paragraph para) {
@@ -634,9 +638,9 @@ public class NotebookServer extends WebSocketServlet
List<NoteInfo> notesInfo = getNotebook().getNotesInfo(
noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles));
Message message = new Message(OP.NOTES_INFO).put("notes", notesInfo);
- connectionManager.multicastToUser(subject.getUser(), message);
+ getConnectionManager().multicastToUser(subject.getUser(), message);
//to others afterwards
- connectionManager.broadcastNoteListExcept(notesInfo, subject);
+ getConnectionManager().broadcastNoteListExcept(notesInfo, subject);
}
public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
@@ -754,7 +758,7 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(List<NoteInfo> notesInfo,
ServiceContext context) throws IOException {
super.onSuccess(notesInfo, context);
- connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
+ getConnectionManager().unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn);
}
});
}
@@ -767,10 +771,10 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(List<NoteInfo> notesInfo,
ServiceContext context) throws IOException {
super.onSuccess(notesInfo, context);
- connectionManager.multicastToUser(context.getAutheInfo().getUser(),
+ getConnectionManager().multicastToUser(context.getAutheInfo().getUser(),
new Message(OP.NOTES_INFO).put("notes", notesInfo));
//to others afterwards
- connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo());
+ getConnectionManager().broadcastNoteListExcept(notesInfo, context.getAutheInfo());
}
});
}
@@ -813,7 +817,7 @@ public class NotebookServer extends WebSocketServlet
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
- connectionManager.addNoteConnection(note.getId(), conn);
+ getConnectionManager().addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
updateAngularObjectRegistry(conn, note);
sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
@@ -858,11 +862,11 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(Note note, ServiceContext context) throws IOException {
super.onSuccess(note, context);
if (note != null) {
- connectionManager.addNoteConnection(note.getId(), conn);
+ getConnectionManager().addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn);
} else {
- connectionManager.removeConnectionFromAllNote(conn);
+ getConnectionManager().removeConnectionFromAllNote(conn);
conn.send(serializeMessage(new Message(OP.NOTE).put("note", null)));
}
}
@@ -885,7 +889,7 @@ public class NotebookServer extends WebSocketServlet
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
- connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
+ getConnectionManager().broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name)
.put("config", config)
.put("info", note.getInfo()));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
@@ -904,7 +908,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
super.onSuccess(note, context);
- connectionManager.broadcastNote(note);
+ getConnectionManager().broadcastNote(note);
}
});
}
@@ -956,7 +960,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
super.onSuccess(note, context);
- connectionManager.addNoteConnection(note.getId(), conn);
+ getConnectionManager().addNoteConnection(note.getId(), conn);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note)));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
@@ -978,7 +982,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(String message, ServiceContext context) throws IOException {
super.onSuccess(message, context);
- connectionManager.removeNoteConnection(noteId);
+ getConnectionManager().removeNoteConnection(noteId);
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
});
@@ -996,7 +1000,7 @@ public class NotebookServer extends WebSocketServlet
ServiceContext context) throws IOException {
super.onSuccess(notesInfo, context);
for (NoteInfo noteInfo : notesInfo) {
- connectionManager.removeNoteConnection(noteInfo.getId());
+ getConnectionManager().removeNoteConnection(noteInfo.getId());
}
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
@@ -1089,7 +1093,7 @@ public class NotebookServer extends WebSocketServlet
private void updateParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
if (noteId == null) {
noteId = (String) fromMessage.get("noteId");
}
@@ -1125,7 +1129,7 @@ public class NotebookServer extends WebSocketServlet
return;
}
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
if (noteId == null) {
noteId = fromMessage.getType("noteId", LOG);
if (noteId == null) {
@@ -1146,21 +1150,21 @@ public class NotebookServer extends WebSocketServlet
super.onSuccess(result, context);
Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", result)
.put("paragraphId", paragraphId);
- connectionManager.broadcastExcept(noteId2, message, conn);
+ getConnectionManager().broadcastExcept(noteId2, message, conn);
}
});
}
private void cloneNote(NotebookSocket conn,
Message fromMessage) throws IOException {
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
String name = (String) fromMessage.get("name");
getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage),
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note newNote, ServiceContext context) throws IOException {
super.onSuccess(newNote, context);
- connectionManager.addNoteConnection(newNote.getId(), conn);
+ getConnectionManager().addNoteConnection(newNote.getId(), conn);
conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote)));
broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
}
@@ -1225,13 +1229,13 @@ public class NotebookServer extends WebSocketServlet
private void removeParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().removeParagraph(noteId, paragraphId,
getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
- connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
+ getConnectionManager().broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).
put("id", p.getId()));
}
});
@@ -1240,14 +1244,14 @@ public class NotebookServer extends WebSocketServlet
private void clearParagraphOutput(NotebookSocket conn,
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage),
new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
if (p.getNote().isPersonalizedMode()) {
- connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+ getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
} else {
broadcastParagraph(p.getNote(), p);
}
@@ -1257,7 +1261,7 @@ public class NotebookServer extends WebSocketServlet
private void completion(NotebookSocket conn,
Message fromMessage) throws IOException {
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
String paragraphId = (String) fromMessage.get("id");
String buffer = (String) fromMessage.get("buf");
int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
@@ -1305,7 +1309,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onSuccess(AngularObject ao, ServiceContext context) throws IOException {
super.onSuccess(ao, context);
- connectionManager.broadcastExcept(noteId,
+ getConnectionManager().broadcastExcept(noteId,
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", ao.getParagraphId()), conn);
@@ -1387,7 +1391,7 @@ public class NotebookServer extends WebSocketServlet
final AngularObject ao =
remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId);
- connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE)
+ getConnectionManager().broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
@@ -1401,7 +1405,7 @@ public class NotebookServer extends WebSocketServlet
NotebookSocket conn) {
final AngularObject ao =
remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId);
- connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE)
+ getConnectionManager().broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE)
.put("angularObject", ao)
.put("interpreterGroupId", interpreterGroupId).put("noteId", noteId)
.put("paragraphId", paragraphId), conn);
@@ -1413,14 +1417,14 @@ public class NotebookServer extends WebSocketServlet
Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().moveParagraph(noteId, paragraphId, newIndex,
getServiceContext(fromMessage),
new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
super.onSuccess(result, context);
- connectionManager.broadcast(result.getNote().getId(),
+ getConnectionManager().broadcast(result.getNote().getId(),
new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex));
}
});
@@ -1429,7 +1433,7 @@ public class NotebookServer extends WebSocketServlet
private String insertParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
Map<String, Object> config;
if (fromMessage.get("config") != null) {
config = (Map<String, Object>) fromMessage.get("config");
@@ -1464,7 +1468,7 @@ public class NotebookServer extends WebSocketServlet
private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage),
new WebSocketServiceCallback<>(conn));
}
@@ -1484,14 +1488,14 @@ public class NotebookServer extends WebSocketServlet
private void broadcastSpellExecution(NotebookSocket conn,
Message fromMessage) throws IOException {
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().spell(noteId, fromMessage,
getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn) {
@Override
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
// broadcast to other clients only
- connectionManager.broadcastExcept(p.getNote().getId(),
+ getConnectionManager().broadcastExcept(p.getNote().getId(),
new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn);
}
});
@@ -1500,7 +1504,7 @@ public class NotebookServer extends WebSocketServlet
private void runParagraph(NotebookSocket conn,
Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
String text = (String) fromMessage.get("paragraph");
String title = (String) fromMessage.get("title");
Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
@@ -1514,7 +1518,7 @@ public class NotebookServer extends WebSocketServlet
if (p.getNote().isPersonalizedMode()) {
Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
context.getAutheInfo().getUser());
- connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+ getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
}
// if it's the last paragraph and not empty, let's add a new one
@@ -1645,7 +1649,7 @@ public class NotebookServer extends WebSocketServlet
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId)
.put("paragraphId", paragraphId).put("index", index).put("data", output);
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
/**
@@ -1669,10 +1673,10 @@ public class NotebookServer extends WebSocketServlet
if (note.isPersonalizedMode()) {
String user = note.getParagraph(paragraphId).getUser();
if (null != user) {
- connectionManager.multicastToUser(user, msg);
+ getConnectionManager().multicastToUser(user, msg);
}
} else {
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
} catch (IOException e) {
LOG.warn("Fail to call onOutputUpdated", e);
@@ -1708,7 +1712,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("appId", appId).put("data", output);
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
/**
@@ -1720,14 +1724,14 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId)
.put("index", index).put("type", type).put("appId", appId).put("data", output);
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
@Override
public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) {
Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId)
.put("appId", appId).put("pkg", pkg);
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
@Override
@@ -1735,7 +1739,7 @@ public class NotebookServer extends WebSocketServlet
Message msg =
new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId)
.put("appId", appId).put("status", status);
- connectionManager.broadcast(noteId, msg);
+ getConnectionManager().broadcast(noteId, msg);
}
@@ -1869,7 +1873,7 @@ public class NotebookServer extends WebSocketServlet
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notesJobInfo);
- connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
+ getConnectionManager().broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response));
}
}
@@ -1877,7 +1881,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onProgressUpdate(Paragraph p, int progress) {
- connectionManager.broadcast(p.getNote().getId(),
+ getConnectionManager().broadcast(p.getNote().getId(),
new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress));
}
@@ -1930,7 +1934,7 @@ public class NotebookServer extends WebSocketServlet
@Override
public void noteRunningStatusChange(String noteId, boolean newStatus) {
- connectionManager.broadcast(
+ getConnectionManager().broadcast(
noteId,
new Message(OP.NOTE_RUNNING_STATUS
).put("status", newStatus));
@@ -1984,7 +1988,7 @@ public class NotebookServer extends WebSocketServlet
continue;
}
- connectionManager.broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE)
+ getConnectionManager().broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId())
.put("paragraphId", object.getParagraphId()));
@@ -2003,7 +2007,7 @@ public class NotebookServer extends WebSocketServlet
getNotebook().getInterpreterSettingManager().getSettingIds();
for (String id : settingIds) {
if (interpreterGroupId.contains(id)) {
- connectionManager.broadcast(note.getId(),
+ getConnectionManager().broadcast(note.getId(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId)
.put("paragraphId", paragraphId));
break;
@@ -2015,7 +2019,7 @@ public class NotebookServer extends WebSocketServlet
private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("paragraphId");
String magic = (String) fromMessage.get("magic");
- String noteId = connectionManager.getAssociatedNoteId(conn);
+ String noteId = getConnectionManager().getAssociatedNoteId(conn);
getNotebookService().getEditorSetting(noteId, magic,
getServiceContext(fromMessage),
@@ -2070,7 +2074,7 @@ public class NotebookServer extends WebSocketServlet
paragraph
.updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId());
- connectionManager.broadcast(
+ getConnectionManager().broadcast(
note.getId(),
new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos",
paragraph.getRuntimeInfos()));
@@ -2121,7 +2125,7 @@ public class NotebookServer extends WebSocketServlet
GUI formsSettings = new GUI();
formsSettings.setForms(note.getNoteForms());
formsSettings.setParams(note.getNoteParams());
- connectionManager.broadcast(note.getId(),
+ getConnectionManager().broadcast(note.getId(),
new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings));
}
@@ -2155,14 +2159,14 @@ public class NotebookServer extends WebSocketServlet
@ManagedAttribute
public Set<String> getConnectedUsers() {
- return connectionManager.getConnectedUsers();
+ return getConnectionManager().getConnectedUsers();
}
@ManagedOperation
public void sendMessage(String message) {
Message m = new Message(OP.NOTICE);
m.data.put("notice", message);
- connectionManager.broadcast(m);
+ getConnectionManager().broadcast(m);
}
private ServiceContext getServiceContext(Message message) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
index 9514def..912ed39 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
@@ -100,7 +100,7 @@ public class ClusterEventTest extends ZeppelinServerMock {
ZeppelinServerMock.startUp("ClusterEventTest", zconf);
notebook = TestUtils.getInstance(Notebook.class);
- authorizationService = new AuthorizationService(notebook, zconf);
+ authorizationService = TestUtils.getInstance(AuthorizationService.class);
schedulerService = new QuartzSchedulerService(zconf, notebook);
notebookServer = spy(NotebookServer.getInstance());
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index f035c6c..185de1f 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -79,7 +79,6 @@ import org.junit.Test;
public class NotebookServerTest extends AbstractTestRestApi {
private static Notebook notebook;
private static NotebookServer notebookServer;
- private static SchedulerService schedulerService;
private static NotebookService notebookService;
private static AuthorizationService authorizationService;
private HttpServletRequest mockRequest;
@@ -89,17 +88,9 @@ public class NotebookServerTest extends AbstractTestRestApi {
public static void init() throws Exception {
AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName());
notebook = TestUtils.getInstance(Notebook.class);
- authorizationService = new AuthorizationService(notebook, notebook.getConf());
- ZeppelinConfiguration conf = ZeppelinConfiguration.create();
- schedulerService = new QuartzSchedulerService(conf, notebook);
- notebookServer = spy(NotebookServer.getInstance());
- notebookService =
- new NotebookService(
- notebook, authorizationService, conf, schedulerService);
-
- ConfigurationService configurationService = new ConfigurationService(notebook.getConf());
- when(notebookServer.getNotebookService()).thenReturn(notebookService);
- when(notebookServer.getConfigurationService()).thenReturn(configurationService);
+ authorizationService = TestUtils.getInstance(AuthorizationService.class);
+ notebookServer = TestUtils.getInstance(NotebookServer.class);
+ notebookService = TestUtils.getInstance(NotebookService.class);
}
@AfterClass
@@ -110,26 +101,19 @@ public class NotebookServerTest extends AbstractTestRestApi {
@Before
public void setUp() {
mockRequest = mock(HttpServletRequest.class);
- anonymous = new AuthenticationInfo("anonymous");
+ anonymous = AuthenticationInfo.ANONYMOUS;
}
@Test
public void checkOrigin() throws UnknownHostException {
- NotebookServer server = new NotebookServer();
- server.setNotebook(() -> notebook);
- server.setNotebookService(() -> notebookService);
String origin = "http://" + InetAddress.getLocalHost().getHostName() + ":8080";
-
assertTrue("Origin " + origin + " is not allowed. Please check your hostname.",
- server.checkOrigin(mockRequest, origin));
+ notebookServer.checkOrigin(mockRequest, origin));
}
@Test
public void checkInvalidOrigin(){
- NotebookServer server = new NotebookServer();
- server.setNotebook(() -> notebook);
- server.setNotebookService(() -> notebookService);
- assertFalse(server.checkOrigin(mockRequest, "http://evillocalhost:8080"));
+ assertFalse(notebookServer.checkOrigin(mockRequest, "http://evillocalhost:8080"));
}
@Test
@@ -201,226 +185,241 @@ public class NotebookServerTest extends AbstractTestRestApi {
@Test
public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent()
throws IOException, InterruptedException {
- // create a notebook
- Note note1 = notebook.createNote("note1", anonymous);
-
- // get reference to interpreterGroup
- InterpreterGroup interpreterGroup = null;
- List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get();
- for (InterpreterSetting setting : settings) {
- if (setting.getName().equals("md")) {
- interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
- break;
+ Note note1 = null;
+ try {
+ // create a notebook
+ note1 = notebook.createNote("note1", anonymous);
+
+ // get reference to interpreterGroup
+ InterpreterGroup interpreterGroup = null;
+ List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get();
+ for (InterpreterSetting setting : settings) {
+ if (setting.getName().equals("md")) {
+ interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
+ break;
+ }
}
- }
- // start interpreter process
- Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
- p1.setText("%md start remote interpreter process");
- p1.setAuthenticationInfo(anonymous);
- note1.run(p1.getId());
-
- // wait for paragraph finished
- while (true) {
- if (p1.getStatus() == Job.Status.FINISHED) {
- break;
+ // start interpreter process
+ Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ p1.setText("%md start remote interpreter process");
+ p1.setAuthenticationInfo(anonymous);
+ note1.run(p1.getId());
+
+ // wait for paragraph finished
+ while (true) {
+ if (p1.getStatus() == Job.Status.FINISHED) {
+ break;
+ }
+ Thread.sleep(100);
}
- Thread.sleep(100);
- }
- // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
- Thread.sleep(1000);
+ // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
+ Thread.sleep(1000);
- // add angularObject
- interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
+ // add angularObject
+ interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
- // create two sockets and open it
- NotebookSocket sock1 = createWebSocket();
- NotebookSocket sock2 = createWebSocket();
-
- assertEquals(sock1, sock1);
- assertNotEquals(sock1, sock2);
+ // create two sockets and open it
+ NotebookSocket sock1 = createWebSocket();
+ NotebookSocket sock2 = createWebSocket();
- notebookServer.onOpen(sock1);
- notebookServer.onOpen(sock2);
- verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject
- // open the same notebook from sockets
- notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
- notebookServer.onMessage(sock2, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
+ assertEquals(sock1, sock1);
+ assertNotEquals(sock1, sock2);
- reset(sock1);
- reset(sock2);
+ notebookServer.onOpen(sock1);
+ notebookServer.onOpen(sock2);
+ verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject
+ // open the same notebook from sockets
+ notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
+ notebookServer.onMessage(sock2, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
- // update object from sock1
- notebookServer.onMessage(sock1,
- new Message(OP.ANGULAR_OBJECT_UPDATED)
- .put("noteId", note1.getId())
- .put("name", "object1")
- .put("value", "value1")
- .put("interpreterGroupId", interpreterGroup.getId()).toJson());
+ reset(sock1);
+ reset(sock2);
+ // update object from sock1
+ notebookServer.onMessage(sock1,
+ new Message(OP.ANGULAR_OBJECT_UPDATED)
+ .put("noteId", note1.getId())
+ .put("name", "object1")
+ .put("value", "value1")
+ .put("interpreterGroupId", interpreterGroup.getId()).toJson());
- // expect object is broadcasted except for where the update is created
- verify(sock1, times(0)).send(anyString());
- verify(sock2, times(1)).send(anyString());
- notebook.removeNote(note1.getId(), anonymous);
+ // expect object is broadcasted except for where the update is created
+ verify(sock1, times(0)).send(anyString());
+ verify(sock2, times(1)).send(anyString());
+ } finally {
+ if (note1 != null) {
+ notebook.removeNote(note1.getId(), anonymous);
+ }
+ }
}
@Test
public void testAngularObjectSaveToNote()
throws IOException, InterruptedException {
// create a notebook
- Note note1 = notebook.createNote("note1", "angular", anonymous);
-
- // get reference to interpreterGroup
- InterpreterGroup interpreterGroup = null;
- List<InterpreterSetting> settings = note1.getBindedInterpreterSettings(new ArrayList<>());
- for (InterpreterSetting setting : settings) {
- if (setting.getName().equals("angular")) {
- interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
- break;
+ Note note1 = null;
+ try {
+ note1 = notebook.createNote("note1", "angular", anonymous);
+
+ // get reference to interpreterGroup
+ InterpreterGroup interpreterGroup = null;
+ List<InterpreterSetting> settings = note1.getBindedInterpreterSettings(new ArrayList<>());
+ for (InterpreterSetting setting : settings) {
+ if (setting.getName().equals("angular")) {
+ interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
+ break;
+ }
}
- }
- // start interpreter process
- Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
- p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>");
- p1.setAuthenticationInfo(anonymous);
- note1.run(p1.getId());
-
- // wait for paragraph finished
- while (true) {
- if (p1.getStatus() == Job.Status.FINISHED) {
- break;
+ // start interpreter process
+ Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>");
+ p1.setAuthenticationInfo(anonymous);
+ note1.run(p1.getId());
+
+ // wait for paragraph finished
+ while (true) {
+ if (p1.getStatus() == Job.Status.FINISHED) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
+ Thread.sleep(1000);
+
+ // create two sockets and open it
+ NotebookSocket sock1 = createWebSocket();
+
+ notebookServer.onOpen(sock1);
+ verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject
+ // open the same notebook from sockets
+ notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
+
+ reset(sock1);
+
+ // bind object from sock1
+ notebookServer.onMessage(sock1,
+ new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
+ .put("noteId", note1.getId())
+ .put("paragraphId", p1.getId())
+ .put("name", "COMMAND_TYPE")
+ .put("value", "COMMAND_TYPE_VALUE")
+ .put("interpreterGroupId", interpreterGroup.getId()).toJson());
+ List<AngularObject> list = note1.getAngularObjects("angular-shared_process");
+ assertEquals(list.size(), 1);
+ assertEquals(list.get(0).getNoteId(), note1.getId());
+ assertEquals(list.get(0).getParagraphId(), p1.getId());
+ assertEquals(list.get(0).getName(), "COMMAND_TYPE");
+ assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE");
+ // Check if the interpreterGroup AngularObjectRegistry is updated
+ Map<String, Map<String, AngularObject>> mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
+ AngularObject ao = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE");
+ assertEquals(ao.getName(), "COMMAND_TYPE");
+ assertEquals(ao.get(), "COMMAND_TYPE_VALUE");
+
+ // update bind object from sock1
+ notebookServer.onMessage(sock1,
+ new Message(OP.ANGULAR_OBJECT_UPDATED)
+ .put("noteId", note1.getId())
+ .put("paragraphId", p1.getId())
+ .put("name", "COMMAND_TYPE")
+ .put("value", "COMMAND_TYPE_VALUE_UPDATE")
+ .put("interpreterGroupId", interpreterGroup.getId()).toJson());
+ list = note1.getAngularObjects("angular-shared_process");
+ assertEquals(list.size(), 1);
+ assertEquals(list.get(0).getNoteId(), note1.getId());
+ assertEquals(list.get(0).getParagraphId(), p1.getId());
+ assertEquals(list.get(0).getName(), "COMMAND_TYPE");
+ assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE_UPDATE");
+ // Check if the interpreterGroup AngularObjectRegistry is updated
+ mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
+ AngularObject ao1 = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE");
+ assertEquals(ao1.getName(), "COMMAND_TYPE");
+ assertEquals(ao1.get(), "COMMAND_TYPE_VALUE_UPDATE");
+
+ // unbind object from sock1
+ notebookServer.onMessage(sock1,
+ new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND)
+ .put("noteId", note1.getId())
+ .put("paragraphId", p1.getId())
+ .put("name", "COMMAND_TYPE")
+ .put("value", "COMMAND_TYPE_VALUE")
+ .put("interpreterGroupId", interpreterGroup.getId()).toJson());
+ list = note1.getAngularObjects("angular-shared_process");
+ assertEquals(list.size(), 0);
+ // Check if the interpreterGroup AngularObjectRegistry is delete
+ mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
+ AngularObject ao2 = mapRegistry.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE");
+ assertNull(ao2);
+ } finally {
+ if (note1 != null) {
+ notebook.removeNote(note1.getId(), anonymous);
}
- Thread.sleep(100);
}
- // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
- Thread.sleep(1000);
-
- // create two sockets and open it
- NotebookSocket sock1 = createWebSocket();
-
- notebookServer.onOpen(sock1);
- verify(sock1, times(0)).send(anyString()); // getNote, getAngularObject
- // open the same notebook from sockets
- notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
-
- reset(sock1);
-
- // bind object from sock1
- notebookServer.onMessage(sock1,
- new Message(OP.ANGULAR_OBJECT_CLIENT_BIND)
- .put("noteId", note1.getId())
- .put("paragraphId", p1.getId())
- .put("name", "COMMAND_TYPE")
- .put("value", "COMMAND_TYPE_VALUE")
- .put("interpreterGroupId", interpreterGroup.getId()).toJson());
- List<AngularObject> list = note1.getAngularObjects("angular-shared_process");
- assertEquals(list.size(), 1);
- assertEquals(list.get(0).getNoteId(), note1.getId());
- assertEquals(list.get(0).getParagraphId(), p1.getId());
- assertEquals(list.get(0).getName(), "COMMAND_TYPE");
- assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE");
- // Check if the interpreterGroup AngularObjectRegistry is updated
- Map<String, Map<String, AngularObject>> mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
- AngularObject ao = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE");
- assertEquals(ao.getName(), "COMMAND_TYPE");
- assertEquals(ao.get(), "COMMAND_TYPE_VALUE");
-
- // update bind object from sock1
- notebookServer.onMessage(sock1,
- new Message(OP.ANGULAR_OBJECT_UPDATED)
- .put("noteId", note1.getId())
- .put("paragraphId", p1.getId())
- .put("name", "COMMAND_TYPE")
- .put("value", "COMMAND_TYPE_VALUE_UPDATE")
- .put("interpreterGroupId", interpreterGroup.getId()).toJson());
- list = note1.getAngularObjects("angular-shared_process");
- assertEquals(list.size(), 1);
- assertEquals(list.get(0).getNoteId(), note1.getId());
- assertEquals(list.get(0).getParagraphId(), p1.getId());
- assertEquals(list.get(0).getName(), "COMMAND_TYPE");
- assertEquals(list.get(0).get(), "COMMAND_TYPE_VALUE_UPDATE");
- // Check if the interpreterGroup AngularObjectRegistry is updated
- mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
- AngularObject ao1 = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE");
- assertEquals(ao1.getName(), "COMMAND_TYPE");
- assertEquals(ao1.get(), "COMMAND_TYPE_VALUE_UPDATE");
-
- // unbind object from sock1
- notebookServer.onMessage(sock1,
- new Message(OP.ANGULAR_OBJECT_CLIENT_UNBIND)
- .put("noteId", note1.getId())
- .put("paragraphId", p1.getId())
- .put("name", "COMMAND_TYPE")
- .put("value", "COMMAND_TYPE_VALUE")
- .put("interpreterGroupId", interpreterGroup.getId()).toJson());
- list = note1.getAngularObjects("angular-shared_process");
- assertEquals(list.size(), 0);
- // Check if the interpreterGroup AngularObjectRegistry is delete
- mapRegistry = interpreterGroup.getAngularObjectRegistry().getRegistry();
- AngularObject ao2 = mapRegistry.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE");
- assertNull(ao2);
-
- notebook.removeNote(note1.getId(), anonymous);
}
@Test
public void testLoadAngularObjectFromNote() throws IOException, InterruptedException {
// create a notebook
- Note note1 = notebook.createNote("note1", anonymous);
-
- // get reference to interpreterGroup
- InterpreterGroup interpreterGroup = null;
- List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get();
- for (InterpreterSetting setting : settings) {
- if (setting.getName().equals("angular")) {
- interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
- break;
+ Note note1 = null;
+ try {
+ note1 = notebook.createNote("note1", anonymous);
+
+ // get reference to interpreterGroup
+ InterpreterGroup interpreterGroup = null;
+ List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get();
+ for (InterpreterSetting setting : settings) {
+ if (setting.getName().equals("angular")) {
+ interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
+ break;
+ }
}
- }
- // start interpreter process
- Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
- p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>");
- p1.setAuthenticationInfo(anonymous);
- note1.run(p1.getId());
-
- // wait for paragraph finished
- while (true) {
- if (p1.getStatus() == Job.Status.FINISHED) {
- break;
+ // start interpreter process
+ Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+ p1.setText("%angular <h2>Bind here : {{COMMAND_TYPE}}</h2>");
+ p1.setAuthenticationInfo(anonymous);
+ note1.run(p1.getId());
+
+ // wait for paragraph finished
+ while (true) {
+ if (p1.getStatus() == Job.Status.FINISHED) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
+ Thread.sleep(1000);
+
+ // set note AngularObject
+ AngularObject ao = new AngularObject("COMMAND_TYPE", "COMMAND_TYPE_VALUE", note1.getId(), p1.getId(), null);
+ note1.addOrUpdateAngularObject("angular-shared_process", ao);
+
+ // create sockets and open it
+ NotebookSocket sock1 = createWebSocket();
+ notebookServer.onOpen(sock1);
+
+ // Check the AngularObjectRegistry of the interpreterGroup before executing GET_NOTE
+ Map<String, Map<String, AngularObject>> mapRegistry1 = interpreterGroup.getAngularObjectRegistry().getRegistry();
+ assertEquals(mapRegistry1.size(), 0);
+
+ // open the notebook from sockets, AngularObjectRegistry that triggers the update of the interpreterGroup
+ notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
+ Thread.sleep(1000);
+
+ // After executing GET_NOTE, check the AngularObjectRegistry of the interpreterGroup
+ Map<String, Map<String, AngularObject>> mapRegistry2 = interpreterGroup.getAngularObjectRegistry().getRegistry();
+ assertEquals(mapRegistry1.size(), 2);
+ AngularObject ao1 = mapRegistry2.get(note1.getId() + "_" + p1.getId()).get("COMMAND_TYPE");
+ assertEquals(ao1.getName(), "COMMAND_TYPE");
+ assertEquals(ao1.get(), "COMMAND_TYPE_VALUE");
+ } finally {
+ if (note1 != null) {
+ notebook.removeNote(note1.getId(), anonymous);
}
- Thread.sleep(100);
}
- // sleep for 1 second to make sure job running thread finish to fire event. See ZEPPELIN-3277
- Thread.sleep(1000);
-
- // set note AngularObject
- AngularObject ao = new AngularObject("COMMAND_TYPE", "COMMAND_TYPE_VALUE", note1.getId(), p1.getId(), null);
- note1.addOrUpdateAngularObject("angular-shared_process", ao);
-
- // create sockets and open it
- NotebookSocket sock1 = createWebSocket();
- notebookServer.onOpen(sock1);
-
- // Check the AngularObjectRegistry of the interpreterGroup before executing GET_NOTE
- Map<String, Map<String, AngularObject>> mapRegistry1 = interpreterGroup.getAngularObjectRegistry().getRegistry();
- assertEquals(mapRegistry1.size(), 0);
-
- // open the notebook from sockets, AngularObjectRegistry that triggers the update of the interpreterGroup
- notebookServer.onMessage(sock1, new Message(OP.GET_NOTE).put("id", note1.getId()).toJson());
- Thread.sleep(1000);
-
- // After executing GET_NOTE, check the AngularObjectRegistry of the interpreterGroup
- Map<String, Map<String, AngularObject>> mapRegistry2 = interpreterGroup.getAngularObjectRegistry().getRegistry();
- assertEquals(mapRegistry1.size(), 2);
- AngularObject ao1 = mapRegistry2.get(note1.getId()+"_"+p1.getId()).get("COMMAND_TYPE");
- assertEquals(ao1.getName(), "COMMAND_TYPE");
- assertEquals(ao1.get(), "COMMAND_TYPE_VALUE");
-
- notebook.removeNote(note1.getId(), anonymous);
}
@Test
@@ -434,18 +433,23 @@ public class NotebookServerTest extends AbstractTestRestApi {
Message messageReceived = notebookServer.deserializeMessage(msg);
Note note = null;
try {
- note = notebookServer.importNote(null, messageReceived);
- } catch (NullPointerException e) {
- //broadcastNoteList(); failed nothing to worry.
- LOG.error("Exception in NotebookServerTest while testImportNotebook, failed nothing to " +
- "worry ", e);
- }
+ try {
+ note = notebookServer.importNote(null, messageReceived);
+ } catch (NullPointerException e) {
+ //broadcastNoteList(); failed nothing to worry.
+ LOG.error("Exception in NotebookServerTest while testImportNotebook, failed nothing to " +
+ "worry ", e);
+ }
- assertNotEquals(null, notebook.getNote(note.getId()));
- assertEquals("Test Zeppelin notebook import", notebook.getNote(note.getId()).getName());
- assertEquals("Test paragraphs import", notebook.getNote(note.getId()).getParagraphs().get(0)
- .getText());
- notebook.removeNote(note.getId(), anonymous);
+ assertNotEquals(null, notebook.getNote(note.getId()));
+ assertEquals("Test Zeppelin notebook import", notebook.getNote(note.getId()).getName());
+ assertEquals("Test paragraphs import", notebook.getNote(note.getId()).getParagraphs().get(0)
+ .getText());
+ } finally {
+ if (note != null) {
+ notebook.removeNote(note.getId(), anonymous);
+ }
+ }
}
@Test
@@ -456,20 +460,25 @@ public class NotebookServerTest extends AbstractTestRestApi {
Message messageReceived = notebookServer.deserializeMessage(msg);
Note note = null;
try {
- note = notebookServer.importNote(null, messageReceived);
- } catch (NullPointerException e) {
- //broadcastNoteList(); failed nothing to worry.
- LOG.error("Exception in NotebookServerTest while testImportJupyterNote, failed nothing to " +
- "worry ", e);
- }
+ try {
+ note = notebookServer.importNote(null, messageReceived);
+ } catch (NullPointerException e) {
+ //broadcastNoteList(); failed nothing to worry.
+ LOG.error("Exception in NotebookServerTest while testImportJupyterNote, failed nothing to " +
+ "worry ", e);
+ }
- assertNotEquals(null, notebook.getNote(note.getId()));
- assertTrue(notebook.getNote(note.getId()).getName(),
- notebook.getNote(note.getId()).getName().startsWith("Note converted from Jupyter_"));
- assertEquals("md", notebook.getNote(note.getId()).getParagraphs().get(0).getIntpText());
- assertEquals("# matplotlib - 2D and 3D plotting in Python",
- notebook.getNote(note.getId()).getParagraphs().get(0).getScriptText());
- notebook.removeNote(note.getId(), anonymous);
+ assertNotEquals(null, notebook.getNote(note.getId()));
+ assertTrue(notebook.getNote(note.getId()).getName(),
+ notebook.getNote(note.getId()).getName().startsWith("Note converted from Jupyter_"));
+ assertEquals("md", notebook.getNote(note.getId()).getParagraphs().get(0).getIntpText());
+ assertEquals("# matplotlib - 2D and 3D plotting in Python",
+ notebook.getNote(note.getId()).getParagraphs().get(0).getScriptText());
+ } finally {
+ if (note != null) {
+ notebook.removeNote(note.getId(), anonymous);
+ }
+ }
}
@Test
@@ -483,46 +492,51 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("value", value)
.put("paragraphId", "paragraphId");
- final Notebook notebook = mock(Notebook.class);
- final NotebookServer server = new NotebookServer();
- server.setNotebook(() -> notebook);
- server.setNotebookService(() -> notebookService);
- final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
+ try {
+ final Notebook notebook = mock(Notebook.class);
+ notebookServer.setNotebook(() -> notebook);
+ notebookServer.setNotebookService(() -> notebookService);
+ final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
- when(notebook.getNote("noteId")).thenReturn(note);
- final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
- when(note.getParagraph("paragraphId")).thenReturn(paragraph);
+ when(notebook.getNote("noteId")).thenReturn(note);
+ final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
+ when(note.getParagraph("paragraphId")).thenReturn(paragraph);
- final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
- final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
- mdGroup.setAngularObjectRegistry(mdRegistry);
+ final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
+ final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
+ mdGroup.setAngularObjectRegistry(mdRegistry);
- when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup);
+ when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup);
- final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId",
- "paragraphId");
+ final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId",
+ "paragraphId");
- when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId"))
- .thenReturn(ao1);
+ when(mdRegistry.addAndNotifyRemoteProcess(varName, value, "noteId", "paragraphId"))
+ .thenReturn(ao1);
- NotebookSocket conn = mock(NotebookSocket.class);
- NotebookSocket otherConn = mock(NotebookSocket.class);
+ NotebookSocket conn = mock(NotebookSocket.class);
+ NotebookSocket otherConn = mock(NotebookSocket.class);
- final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
- .put("angularObject", ao1)
- .put("interpreterGroupId", "mdGroup")
- .put("noteId", "noteId")
- .put("paragraphId", "paragraphId"));
+ final String mdMsg1 = notebookServer.serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", ao1)
+ .put("interpreterGroupId", "mdGroup")
+ .put("noteId", "noteId")
+ .put("paragraphId", "paragraphId"));
- server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
+ notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
- // When
- server.angularObjectClientBind(conn, messageReceived);
+ // When
+ notebookServer.angularObjectClientBind(conn, messageReceived);
- // Then
- verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null);
+ // Then
+ verify(mdRegistry, never()).addAndNotifyRemoteProcess(varName, value, "noteId", null);
- verify(otherConn).send(mdMsg1);
+ verify(otherConn).send(mdMsg1);
+ } finally {
+ // reset these so that it won't affect other tests
+ notebookServer.setNotebook(() -> NotebookServerTest.notebook);
+ notebookServer.setNotebookService(() -> NotebookServerTest.notebookService);
+ }
}
@Test
@@ -535,42 +549,47 @@ public class NotebookServerTest extends AbstractTestRestApi {
.put("name", varName)
.put("paragraphId", "paragraphId");
- final Notebook notebook = mock(Notebook.class);
- final NotebookServer server = new NotebookServer();
- server.setNotebook(() -> notebook);
- server.setNotebookService(() -> notebookService);
- final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
- when(notebook.getNote("noteId")).thenReturn(note);
- final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
- when(note.getParagraph("paragraphId")).thenReturn(paragraph);
-
- final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
- final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
- mdGroup.setAngularObjectRegistry(mdRegistry);
-
- when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup);
-
- final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId",
- "paragraphId");
- when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1);
- NotebookSocket conn = mock(NotebookSocket.class);
- NotebookSocket otherConn = mock(NotebookSocket.class);
-
- final String mdMsg1 = server.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE)
- .put("angularObject", ao1)
- .put("interpreterGroupId", "mdGroup")
- .put("noteId", "noteId")
- .put("paragraphId", "paragraphId"));
+ try {
+ final Notebook notebook = mock(Notebook.class);
+ notebookServer.setNotebook(() -> notebook);
+ notebookServer.setNotebookService(() -> notebookService);
+ final Note note = mock(Note.class, RETURNS_DEEP_STUBS);
+ when(notebook.getNote("noteId")).thenReturn(note);
+ final Paragraph paragraph = mock(Paragraph.class, RETURNS_DEEP_STUBS);
+ when(note.getParagraph("paragraphId")).thenReturn(paragraph);
- server.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
+ final RemoteAngularObjectRegistry mdRegistry = mock(RemoteAngularObjectRegistry.class);
+ final InterpreterGroup mdGroup = new InterpreterGroup("mdGroup");
+ mdGroup.setAngularObjectRegistry(mdRegistry);
- // When
- server.angularObjectClientUnbind(conn, messageReceived);
+ when(paragraph.getBindedInterpreter().getInterpreterGroup()).thenReturn(mdGroup);
- // Then
- verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null);
+ final AngularObject<String> ao1 = AngularObjectBuilder.build(varName, value, "noteId",
+ "paragraphId");
+ when(mdRegistry.removeAndNotifyRemoteProcess(varName, "noteId", "paragraphId")).thenReturn(ao1);
+ NotebookSocket conn = mock(NotebookSocket.class);
+ NotebookSocket otherConn = mock(NotebookSocket.class);
- verify(otherConn).send(mdMsg1);
+ final String mdMsg1 = notebookServer.serializeMessage(new Message(OP.ANGULAR_OBJECT_REMOVE)
+ .put("angularObject", ao1)
+ .put("interpreterGroupId", "mdGroup")
+ .put("noteId", "noteId")
+ .put("paragraphId", "paragraphId"));
+
+ notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
+
+ // When
+ notebookServer.angularObjectClientUnbind(conn, messageReceived);
+
+ // Then
+ verify(mdRegistry, never()).removeAndNotifyRemoteProcess(varName, "noteId", null);
+
+ verify(otherConn).send(mdMsg1);
+ } finally {
+ // reset these so that it won't affect other tests
+ notebookServer.setNotebook(() -> NotebookServerTest.notebook);
+ notebookServer.setNotebookService(() -> NotebookServerTest.notebookService);
+ }
}
@Test
@@ -676,7 +695,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%md start remote interpreter process");
p1.setAuthenticationInfo(anonymous);
- notebookServer.getNotebook().saveNote(note, anonymous);
+ notebook.saveNote(note, anonymous);
String noteId = note.getId();
String user1Id = "user1", user2Id = "user2";