You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/08/13 21:12:17 UTC
[zeppelin] branch master updated: [ZEPPELIN-4985] Instant visual
feedback on high latency network environment
This is an automated email from the ASF dual-hosted git repository.
moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 1d01972 [ZEPPELIN-4985] Instant visual feedback on high latency network environment
1d01972 is described below
commit 1d01972c0318fae43321ee74b2cb3f224f7d61fb
Author: Lee moon soo <le...@gmail.com>
AuthorDate: Thu Aug 6 14:07:22 2020 -0700
[ZEPPELIN-4985] Instant visual feedback on high latency network environment
### What is this PR for?
This PR provides an instant visual feedback to improve user experience on high latency network environment.
It focuses on improving 2 frequent user action
- Run paragraph(s)
- Swich visualization
In high latency network, the user may feel a short 'freeze' between action (click) and any visual change in notebook.
While the paragraph is updated after get a response from server.
After this change, the paragraph will get immediate update after a user action, without waiting for server response.
#### Before
![instant-visual-feedback-before](https://user-images.githubusercontent.com/1540981/89499219-7ce85300-d774-11ea-9c6e-b5e218bd48e2.gif)
- Some delay to become PENDING status after click run
- Some delay after switch visualization (screenrecord is not long enough to see actual change after delay)
#### After
![instant-visual-feedback-after](https://user-images.githubusercontent.com/1540981/89499227-81147080-d774-11ea-9aa5-0cf305f460bb.gif)
- Instant status update to PENDING after click run
- Instant visualization switch
### What type of PR is it?
Improvement
### Todos
* [x] - Instant visual feedback on paragraph run
* [x] - Instant visual feedback on visualization switch
* [x] - Instant visual feedback on paragraph run (new ui)
* [x] - Instant visual feedback on visualization switch (new ui)
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-4985
### How should this be tested?
Can simulate high latency network by adding sleep in NotebookSocket.send() method
```
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
-67,6 +67,9 public class NotebookSocket extends WebSocketAdapter {
}
public synchronized void send(String serializeMessage) throws IOException {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
connection.getRemote().sendString(serializeMessage);
}
```
Screen recordings attached are made in this way.
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <le...@gmail.com>
Closes #3873 from Leemoonsoo/ZEPPELIN-4985 and squashes the following commits:
b064cf414 [Lee moon soo] update test
c44c1002f [Lee moon soo] implement paragraph update short circuit in new ui
5952cf77c [Lee moon soo] instant update on switching visualization
c38002972 [Lee moon soo] try implement short circuit logic in new ui
97c832e0f [Lee moon soo] skip update paragraph to READY when submit
00135e149 [Lee moon soo] short circuit status update on paragraph run
---
.../zeppelin/integration/ParagraphActionsIT.java | 2 +-
.../zeppelin/cluster/event/ClusterMessage.java | 9 ++++
.../org/apache/zeppelin/rest/NotebookRestApi.java | 4 +-
.../apache/zeppelin/socket/ConnectionManager.java | 4 +-
.../org/apache/zeppelin/socket/NotebookServer.java | 53 ++++++++++----------
.../projects/zeppelin-sdk/package.json | 2 +-
.../interfaces/message-data-type-map.interface.ts | 2 +
.../src/interfaces/message-operator.interface.ts | 8 ++++
.../src/interfaces/message-paragraph.interface.ts | 5 ++
.../src/interfaces/websocket-message.interface.ts | 1 +
.../projects/zeppelin-sdk/src/message.ts | 56 ++++++++++++++++++----
.../src/app/core/paragraph-base/paragraph-base.ts | 8 ++++
.../app/notebook/paragraph/paragraph.controller.js | 7 +++
.../notebook/paragraph/result/result.controller.js | 17 +++----
.../websocket/websocket-event.factory.js | 18 ++++++-
.../websocket/websocket-message.service.js | 16 +++++++
.../org/apache/zeppelin/notebook/Paragraph.java | 1 -
.../apache/zeppelin/notebook/socket/Message.java | 12 +++++
18 files changed, 178 insertions(+), 47 deletions(-)
diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
index 13b96ae..c6a11eb 100644
--- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
+++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
@@ -226,7 +226,7 @@ public class ParagraphActionsIT extends AbstractZeppelinIT {
ZeppelinITUtils.sleep(2000, false);
collector.checkThat("Paragraph status is ",
- getParagraphStatus(1), CoreMatchers.equalTo("READY")
+ getParagraphStatus(1), CoreMatchers.equalTo("PENDING")
);
driver.navigate().refresh();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
index cd999c4..24e2f22 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
@@ -26,6 +26,7 @@ import java.util.Map;
public class ClusterMessage {
public ClusterEvent clusterEvent;
private Map<String, String> data = new HashMap<>();
+ private String msgId;
private static Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -54,6 +55,14 @@ public class ClusterMessage {
return data;
}
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
public static ClusterMessage deserializeMessage(String msg) {
return gson.fromJson(msg, ClusterMessage.class);
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index e6dcfee..52f4acf 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -74,6 +74,8 @@ import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED;
+
/**
* Rest api endpoint for the notebook.
*/
@@ -574,7 +576,7 @@ public class NotebookRestApi extends AbstractRestApi {
AuthenticationInfo subject = new AuthenticationInfo(user);
notebook.saveNote(note, subject);
- notebookServer.broadcastParagraph(note, p);
+ notebookServer.broadcastParagraph(note, p, MSG_ID_NOT_DEFINED);
return new JsonResponse<>(Status.OK, "").build();
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index d44c656..a36f895 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -343,7 +343,7 @@ public class ConnectionManager {
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
}
- public void unicastParagraph(Note note, Paragraph p, String user) {
+ public void unicastParagraph(Note note, Paragraph p, String user, String msgId) {
if (!note.isPersonalizedMode() || p == null || user == null) {
return;
}
@@ -354,7 +354,7 @@ public class ConnectionManager {
}
for (NotebookSocket conn : userSocketMap.get(user)) {
- Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p);
+ Message m = new Message(Message.OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p);
unicast(m, conn);
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 41f10f8..2db1d26 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -96,6 +96,8 @@ import org.glassfish.hk2.api.ServiceLocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED;
+
/**
* Zeppelin websocket service. This class used setter injection because all servlet should have
* no-parameter constructor
@@ -578,7 +580,7 @@ public class NotebookServer extends WebSocketServlet
public void broadcastNote(Note note) {
inlineBroadcastNote(note);
- broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, note);
+ broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, MSG_ID_NOT_DEFINED, note);
}
private void inlineBroadcastNote(Note note) {
@@ -586,36 +588,38 @@ public class NotebookServer extends WebSocketServlet
getConnectionManager().broadcast(note.getId(), message);
}
- private void inlineBroadcastParagraph(Note note, Paragraph p) {
+ private void inlineBroadcastParagraph(Note note, Paragraph p, String msgId) {
broadcastNoteForms(note);
if (note.isPersonalizedMode()) {
- broadcastParagraphs(p.getUserParagraphMap(), p);
+ broadcastParagraphs(p.getUserParagraphMap(), p, msgId);
} else {
- Message message = new Message(OP.PARAGRAPH).put("paragraph", p);
+ Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p);
getConnectionManager().broadcast(note.getId(), message);
}
}
- public void broadcastParagraph(Note note, Paragraph p) {
- inlineBroadcastParagraph(note, p);
- broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, note, p);
+ public void broadcastParagraph(Note note, Paragraph p, String msgId) {
+ inlineBroadcastParagraph(note, p, msgId);
+ broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, msgId, note, p);
}
private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap,
- Paragraph defaultParagraph) {
+ Paragraph defaultParagraph,
+ String msgId) {
if (null != userParagraphMap) {
for (String user : userParagraphMap.keySet()) {
- Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user));
+ Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", userParagraphMap.get(user));
getConnectionManager().multicastToUser(user, message);
}
}
}
private void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
- Paragraph defaultParagraph) {
- inlineBroadcastParagraphs(userParagraphMap, defaultParagraph);
- broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, userParagraphMap, defaultParagraph);
+ Paragraph defaultParagraph,
+ String msgId) {
+ inlineBroadcastParagraphs(userParagraphMap, defaultParagraph, msgId);
+ broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, msgId, userParagraphMap, defaultParagraph);
}
private void inlineBroadcastNewParagraph(Note note, Paragraph para) {
@@ -628,7 +632,7 @@ public class NotebookServer extends WebSocketServlet
private void broadcastNewParagraph(Note note, Paragraph para) {
inlineBroadcastNewParagraph(note, para);
- broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, note, para);
+ broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, MSG_ID_NOT_DEFINED, note, para);
}
public void inlineBroadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
@@ -647,17 +651,18 @@ public class NotebookServer extends WebSocketServlet
public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
inlineBroadcastNoteList(subject, userAndRoles);
- broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, subject, userAndRoles);
+ broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, MSG_ID_NOT_DEFINED, subject, userAndRoles);
}
// broadcast ClusterEvent
- private void broadcastClusterEvent(ClusterEvent event, Object... objects) {
+ private void broadcastClusterEvent(ClusterEvent event, String msgId, Object... objects) {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
if (!conf.isClusterMode()) {
return;
}
ClusterMessage clusterMessage = new ClusterMessage(event);
+ clusterMessage.setMsgId(msgId);
for(Object object : objects) {
String json = "";
@@ -739,10 +744,10 @@ public class NotebookServer extends WebSocketServlet
}
break;
case BROADCAST_PARAGRAPH:
- inlineBroadcastParagraph(note, paragraph);
+ inlineBroadcastParagraph(note, paragraph, message.getMsgId());
break;
case BROADCAST_PARAGRAPHS:
- inlineBroadcastParagraphs(userParagraphMap, paragraph);
+ inlineBroadcastParagraphs(userParagraphMap, paragraph, message.getMsgId());
break;
case BROADCAST_NEW_PARAGRAPH:
inlineBroadcastNewParagraph(note, paragraph);
@@ -1113,9 +1118,9 @@ public class NotebookServer extends WebSocketServlet
if (p.getNote().isPersonalizedMode()) {
Map<String, Paragraph> userParagraphMap =
p.getNote().getParagraph(paragraphId).getUserParagraphMap();
- broadcastParagraphs(userParagraphMap, p);
+ broadcastParagraphs(userParagraphMap, p, fromMessage.msgId);
} else {
- broadcastParagraph(p.getNote(), p);
+ broadcastParagraph(p.getNote(), p, fromMessage.msgId);
}
}
});
@@ -1253,9 +1258,9 @@ public class NotebookServer extends WebSocketServlet
public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
super.onSuccess(p, context);
if (p.getNote().isPersonalizedMode()) {
- getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+ getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser(), fromMessage.msgId);
} else {
- broadcastParagraph(p.getNote(), p);
+ broadcastParagraph(p.getNote(), p, fromMessage.msgId);
}
}
});
@@ -1520,7 +1525,7 @@ public class NotebookServer extends WebSocketServlet
if (p.getNote().isPersonalizedMode()) {
Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
context.getAutheInfo().getUser());
- getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+ getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser(), fromMessage.msgId);
}
// if it's the last paragraph and not empty, let's add a new one
@@ -1698,7 +1703,7 @@ public class NotebookServer extends WebSocketServlet
} else {
note.clearParagraphOutput(paragraphId);
Paragraph paragraph = note.getParagraph(paragraphId);
- broadcastParagraph(note, paragraph);
+ broadcastParagraph(note, paragraph, MSG_ID_NOT_DEFINED);
}
} catch (IOException e) {
LOG.warn("Fail to call onOutputClear", e);
@@ -1920,7 +1925,7 @@ public class NotebookServer extends WebSocketServlet
}
p.setStatusToUserParagraph(p.getStatus());
- broadcastParagraph(p.getNote(), p);
+ broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED);
try {
broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000);
} catch (IOException e) {
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/package.json b/zeppelin-web-angular/projects/zeppelin-sdk/package.json
index 9be3b66..4bb41ea 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/package.json
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/package.json
@@ -5,4 +5,4 @@
"@angular/common": "^8.2.9",
"@angular/core": "^8.2.9"
}
-}
\ No newline at end of file
+}
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
index ddf934e..fa8bddf 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
@@ -64,6 +64,7 @@ import {
ParagraphClearOutput,
ParagraphRemove,
ParagraphRemoved,
+ ParagraphStatus,
ParasInfo,
PatchParagraphReceived,
PatchParagraphSend,
@@ -102,6 +103,7 @@ export interface MessageReceiveDataTypeMap {
[OP.PARAGRAPH_REMOVED]: ParagraphRemoved;
[OP.EDITOR_SETTING]: EditorSettingReceived;
[OP.PROGRESS]: Progress;
+ [OP.PARAGRAPH_STATUS]: ParagraphStatus;
[OP.PARAGRAPH_MOVED]: ParagraphMoved;
[OP.AUTH_INFO]: AuthInfo;
[OP.NOTE_UPDATED]: NoteUpdated;
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
index d3ce82b..1019330 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
@@ -51,6 +51,14 @@ export enum OP {
PROGRESS = 'PROGRESS',
/**
+ * [short circuit]
+ * paragraph status update
+ * @param id paragraph id
+ * @param progress percentage progress
+ */
+ PARAGRAPH_STATUS = 'PARAGRAPH_STATUS',
+
+ /**
* [c-s]
* create new notebook
*/
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
index 2e04b22..1ec0cfb 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
@@ -299,6 +299,11 @@ export interface Progress {
progress: number;
}
+export interface ParagraphStatus {
+ id: string;
+ status: string;
+}
+
interface GraphConfigSetting {
table?: VisualizationTable;
lineChart?: VisualizationLineChart;
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
index bdc71e1..fcfd896 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
@@ -18,4 +18,5 @@ export interface WebSocketMessage<K extends keyof MixMessageDataTypeMap> {
ticket?: string; // default 'anonymous'
principal?: string; // default 'anonymous'
roles?: string; // default '[]'
+ msgId?: string;
}
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
index 4505e36..921876a 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
@@ -10,20 +10,20 @@
* limitations under the License.
*/
-import { interval, Observable, Subject, Subscription } from 'rxjs';
-import { delay, filter, map, mergeMap, retryWhen, take } from 'rxjs/operators';
-import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
+import {interval, Observable, Subject, Subscription} from 'rxjs';
+import {delay, filter, map, mergeMap, retryWhen, take} from 'rxjs/operators';
+import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
-import { Ticket } from './interfaces/message-common.interface';
+import {Ticket} from './interfaces/message-common.interface';
import {
MessageReceiveDataTypeMap,
MessageSendDataTypeMap,
MixMessageDataTypeMap
} from './interfaces/message-data-type-map.interface';
-import { NoteConfig, PersonalizedMode, SendNote } from './interfaces/message-notebook.interface';
-import { OP } from './interfaces/message-operator.interface';
-import { ParagraphConfig, ParagraphParams, SendParagraph } from './interfaces/message-paragraph.interface';
-import { WebSocketMessage } from './interfaces/websocket-message.interface';
+import {NoteConfig, PersonalizedMode, SendNote} from './interfaces/message-notebook.interface';
+import {OP} from './interfaces/message-operator.interface';
+import {ParagraphConfig, ParagraphParams, SendParagraph} from './interfaces/message-paragraph.interface';
+import {WebSocketMessage} from './interfaces/websocket-message.interface';
export type ArgumentsType<T> = T extends (...args: infer U) => void ? U : never;
@@ -46,6 +46,8 @@ export class Message {
private pingIntervalSubscription = new Subscription();
private wsUrl: string;
private ticket: Ticket;
+ private uniqueClientId = Math.random().toString(36).substring(2, 7);
+ private lastMsgIdSeqSent = 0;
constructor() {
this.open$.subscribe(() => {
@@ -140,6 +142,7 @@ export class Message {
const [op, data] = args;
const message: WebSocketMessage<K> = {
op,
+ msgId: `${this.uniqueClientId}-${++this.lastMsgIdSeqSent}`,
data: data as MixMessageDataTypeMap[K],
...this.ticket
};
@@ -152,10 +155,37 @@ export class Message {
receive<K extends keyof MessageReceiveDataTypeMap>(op: K): Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]> {
return this.received$.pipe(
filter(message => message.op === op),
+ filter(message => {
+ if (!message.msgId) {
+ // when msgId is not specified, it is not response to client request.
+ // always process them
+ return true;
+ }
+ const uniqueClientId = message.msgId.split('-')[0];
+ const msgIdSeqReceived = parseInt(message.msgId.split('-')[1], 10);
+ const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId;
+
+ if (message.op === OP.PARAGRAPH) {
+ if (isResponseForRequestFromThisClient &&
+ this.lastMsgIdSeqSent > msgIdSeqReceived
+ ) {
+ console.log('PARAPGRAPH is already updated by shortcircuit');
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ return true;
+ }
+ }),
map(message => message.data)
) as Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]>;
}
+ shortCircuit(message: WebSocketMessage<keyof MessageReceiveDataTypeMap>) {
+ this.received$.next(this.interceptReceived(message));
+ }
+
destroy(): void {
this.ws.complete();
this.ws = null;
@@ -352,6 +382,16 @@ export class Message {
paragraphConfig: ParagraphConfig,
paragraphParams: ParagraphParams
): void {
+ // short circuit update status without waiting for server response
+ this.shortCircuit({
+ op: OP.PARAGRAPH_STATUS,
+ data: {
+ id: paragraphId,
+ status: "PENDING"
+ }
+ })
+
+ // send message to server
this.send<OP.RUN_PARAGRAPH>(OP.RUN_PARAGRAPH, {
id: paragraphId,
title: paragraphTitle,
diff --git a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
index f8f9a1b..7fcb35c 100644
--- a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
+++ b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
@@ -69,6 +69,14 @@ export abstract class ParagraphBase extends MessageListenersManager {
}
}
+ @MessageListener(OP.PARAGRAPH_STATUS)
+ onParagraphStatus(data: MessageReceiveDataTypeMap[OP.PARAGRAPH_STATUS]) {
+ if (data.id === this.paragraph.id) {
+ this.paragraph.status = data.status;
+ this.cdr.markForCheck();
+ }
+ }
+
@MessageListener(OP.NOTE_RUNNING_STATUS)
noteRunningStatusChange(data: MessageReceiveDataTypeMap[OP.NOTE_RUNNING_STATUS]) {
this.isEntireNoteRunning = data.status;
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index 6ccff79..21efb20 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -459,6 +459,7 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
if (!paragraphText || $scope.isRunning($scope.paragraph)) {
return;
}
+
const magic = SpellResult.extractMagic(paragraphText);
if (heliumService.getSpellByMagic(magic)) {
@@ -1632,6 +1633,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
}
});
+ $scope.$on('updateStatus', function(event, data) {
+ if (data.id === $scope.paragraph.id) {
+ $scope.paragraph.status = data.status;
+ }
+ });
+
$scope.$on('appendParagraphOutput', function(event, data) {
if (data.paragraphId === $scope.paragraph.id) {
if (!$scope.paragraph.results) {
diff --git a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
index 4aabc5c..ec74cb9 100644
--- a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
@@ -762,14 +762,15 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
let newParagraphConfig = angular.copy(paragraph.config);
newParagraphConfig.results = newParagraphConfig.results || [];
newParagraphConfig.results[resultIndex] = config;
- if ($scope.revisionView === true) {
- // local update without commit
- updateData({
- type: $scope.type,
- data: data,
- }, newParagraphConfig.results[resultIndex], paragraph, resultIndex);
- renderResult($scope.type, true);
- } else {
+
+ // local update without commit
+ updateData({
+ type: $scope.type,
+ data: data,
+ }, newParagraphConfig.results[resultIndex], paragraph, resultIndex);
+ renderResult($scope.type, true);
+
+ if ($scope.revisionView !== true) {
if (! $scope.viewOnly) {
return websocketMsgSrv.commitParagraph(paragraph.id, title, text, newParagraphConfig, params);
}
diff --git a/zeppelin-web/src/components/websocket/websocket-event.factory.js b/zeppelin-web/src/components/websocket/websocket-event.factory.js
index f1b8cd6..8cbf34c 100644
--- a/zeppelin-web/src/components/websocket/websocket-event.factory.js
+++ b/zeppelin-web/src/components/websocket/websocket-event.factory.js
@@ -19,6 +19,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
let websocketCalls = {};
let pingIntervalId;
+ const uniqueClientId = Math.random().toString(36).substring(2, 7);
+ let lastMsgIdSeqSent = 0;
websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl());
websocketCalls.ws.reconnectIfNotNormalClose = true;
@@ -41,6 +43,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
data.ticket = '';
data.roles = '';
}
+
+ data.msgId = uniqueClientId + '-' + ++lastMsgIdSeqSent;
console.log('Send >> %o, %o, %o, %o, %o', data.op, data.principal, data.ticket, data.roles, data);
return websocketCalls.ws.send(JSON.stringify(data));
};
@@ -59,6 +63,11 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
let op = payload.op;
let data = payload.data;
+ let msgId = payload.msgId;
+ const uniqueClientId = msgId ? msgId.split('-')[0] : undefined;
+ const msgIdSeqReceived = msgId ? parseInt(msgId.split('-')[1]) : undefined;
+ const isResponseForRequestFromThisClient = uniqueClientId === uniqueClientId;
+
if (op === 'NOTE') {
$rootScope.$broadcast('setNoteContent', data.note);
} else if (op === 'NEW_NOTE') {
@@ -111,7 +120,14 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
buttons: btn,
});
} else if (op === 'PARAGRAPH') {
- $rootScope.$broadcast('updateParagraph', data);
+ if (isResponseForRequestFromThisClient &&
+ lastMsgIdSeqSent > msgIdSeqReceived
+ ) {
+ // paragraph is already updated by short circuit.
+ console.log('PARAPGRAPH is already updated by shortcircuit');
+ } else {
+ $rootScope.$broadcast('updateParagraph', data);
+ }
} else if (op === 'PATCH_PARAGRAPH') {
$rootScope.$broadcast('patchReceived', data);
} else if (op === 'COLLABORATIVE_MODE_STATUS') {
diff --git a/zeppelin-web/src/components/websocket/websocket-message.service.js b/zeppelin-web/src/components/websocket/websocket-message.service.js
index a959070..ee2f773 100644
--- a/zeppelin-web/src/components/websocket/websocket-message.service.js
+++ b/zeppelin-web/src/components/websocket/websocket-message.service.js
@@ -188,6 +188,13 @@ function WebsocketMessageService($rootScope, websocketEvents) {
},
runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) {
+ // short circuit update paragraph status for immediate visual feedback without waiting for server response
+ $rootScope.$broadcast('updateStatus', {
+ id: paragraphId,
+ status: 'PENDING',
+ });
+
+ // send message to server
websocketEvents.sendNewEvent({
op: 'RUN_PARAGRAPH',
data: {
@@ -201,6 +208,15 @@ function WebsocketMessageService($rootScope, websocketEvents) {
},
runAllParagraphs: function(noteId, paragraphs) {
+ // short circuit update paragraph status for immediate visual feedback without waiting for server response
+ paragraphs.forEach((p) => {
+ $rootScope.$broadcast('updateStatus', {
+ id: p.id,
+ status: 'PENDING',
+ });
+ });
+
+ // send message to server
websocketEvents.sendNewEvent({
op: 'RUN_ALL_PARAGRAPHS',
data: {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index ce80523..336be44 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -338,7 +338,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
setStatus(Job.Status.FINISHED);
return true;
}
- setStatus(Status.READY);
if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) {
setAuthenticationInfo(getAuthenticationInfo());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index 2b9f339..9d6803e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -225,10 +225,22 @@ public class Message implements JsonSerializable {
public String principal = "anonymous";
public String roles = "";
+ // Unique id generated from client side. to identify message.
+ // When message from server is response to the client request
+ // includes the msgId in response message, client can pair request and response message.
+ // When server send message that is not response to the client request, set null;
+ public String msgId = MSG_ID_NOT_DEFINED;
+ public static String MSG_ID_NOT_DEFINED = null;
+
public Message(OP op) {
this.op = op;
}
+ public Message withMsgId(String msgId) {
+ this.msgId = msgId;
+ return this;
+ }
+
public Message put(String k, Object v) {
data.put(k, v);
return this;