You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/17 03:09:46 UTC
[zeppelin] branch master updated: [ZEPPELIN-5154]. Add option to
disable broadcast Paragraph status/progress to frontend
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 2128166 [ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress to frontend
2128166 is described below
commit 2128166dc1ece228b8f7174a38e65c6c5c2dcd9c
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Dec 9 14:25:09 2020 +0800
[ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress to frontend
### What is this PR for?
Broadcast paragraph status/progress will be a very heavy operation if there's many jobs running, so I add one option to disable it. When zeppelin is used as job server instead of interactive notebook, it is not necessary to broadcast paragraph status/progress in real time.
### What type of PR is it?
[ Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5154
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3991 from zjffdu/ZEPPELIN-5154 and squashes the following commits:
b5f66be2d [Jeff Zhang] [ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress via websocket
---
.../org/apache/zeppelin/conf/ZeppelinConfiguration.java | 1 +
.../java/org/apache/zeppelin/socket/NotebookServer.java | 15 +++++++++++++++
2 files changed, 16 insertions(+)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 0492fac..71198a7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -1057,6 +1057,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true),
ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null),
ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "10240000"),
+ ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS("zeppelin.websocket.paragraph_status_progress.enable", true),
ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false),
ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", " "),
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 2c7ece1..c3f0021 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -142,6 +142,8 @@ public class NotebookServer extends WebSocketServlet
private static AtomicReference<NotebookServer> self = new AtomicReference<>();
private ExecutorService executorService = Executors.newFixedThreadPool(10);
+ private boolean sendParagraphStatusToFrontend = ZeppelinConfiguration.create().getBoolean(
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS);
private Provider<Notebook> notebookProvider;
private Provider<NotebookService> notebookServiceProvider;
@@ -1658,6 +1660,9 @@ public class NotebookServer extends WebSocketServlet
*/
@Override
public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+ if (!sendParagraphStatusToFrontend) {
+ return;
+ }
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId)
.put("paragraphId", paragraphId).put("index", index).put("data", output);
getConnectionManager().broadcast(noteId, msg);
@@ -1671,6 +1676,9 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onOutputUpdated(String noteId, String paragraphId, int index,
InterpreterResult.Type type, String output) {
+ if (!sendParagraphStatusToFrontend) {
+ return;
+ }
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId)
.put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output);
try {
@@ -1699,6 +1707,10 @@ public class NotebookServer extends WebSocketServlet
*/
@Override
public void onOutputClear(String noteId, String paragraphId) {
+ if (!sendParagraphStatusToFrontend) {
+ return;
+ }
+
try {
final Note note = getNotebook().getNote(noteId);
if (note == null) {
@@ -1894,6 +1906,9 @@ public class NotebookServer extends WebSocketServlet
@Override
public void onProgressUpdate(Paragraph p, int progress) {
+ if (!sendParagraphStatusToFrontend) {
+ return;
+ }
getConnectionManager().broadcast(p.getNote().getId(),
new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress));
}