You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/08/13 21:12:17 UTC

[zeppelin] branch master updated: [ZEPPELIN-4985] Instant visual feedback on high latency network environment

This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d01972  [ZEPPELIN-4985] Instant visual feedback on high latency network environment
1d01972 is described below

commit 1d01972c0318fae43321ee74b2cb3f224f7d61fb
Author: Lee moon soo <le...@gmail.com>
AuthorDate: Thu Aug 6 14:07:22 2020 -0700

    [ZEPPELIN-4985] Instant visual feedback on high latency network environment
    
    ### What is this PR for?
    
    This PR provides an instant visual feedback to improve user experience on high latency network environment.
    It focuses on improving 2 frequent user action
    
     - Run paragraph(s)
     - Swich visualization
    
    In high latency network, the user may feel a short 'freeze' between action (click) and any visual change in notebook.
    While the paragraph is updated after get a response from server.
    
    After this change, the paragraph will get immediate update after a user action, without waiting for server response.
    
    #### Before
    ![instant-visual-feedback-before](https://user-images.githubusercontent.com/1540981/89499219-7ce85300-d774-11ea-9c6e-b5e218bd48e2.gif)
    
     - Some delay to become PENDING status after click run
     - Some delay after switch visualization (screenrecord is not long enough to see actual change after delay)
    
    #### After
    ![instant-visual-feedback-after](https://user-images.githubusercontent.com/1540981/89499227-81147080-d774-11ea-9aa5-0cf305f460bb.gif)
    
      - Instant status update to PENDING after click run
      - Instant visualization switch
    
    ### What type of PR is it?
    Improvement
    
    ### Todos
    * [x] - Instant visual feedback on paragraph run
    * [x] - Instant visual feedback on visualization switch
    * [x] - Instant visual feedback on paragraph run (new ui)
    * [x] - Instant visual feedback on visualization switch (new ui)
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4985
    
    ### How should this be tested?
    
    Can simulate high latency network by adding sleep in NotebookSocket.send() method
    
    ```
    --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
    +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
     -67,6 +67,9  public class NotebookSocket extends WebSocketAdapter {
       }
    
       public synchronized void send(String serializeMessage) throws IOException {
    +    try {
    +      Thread.sleep(1000);
    +    } catch (InterruptedException e) {}
         connection.getRemote().sendString(serializeMessage);
       }
    ```
    
    Screen recordings attached are made in this way.
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Lee moon soo <le...@gmail.com>
    
    Closes #3873 from Leemoonsoo/ZEPPELIN-4985 and squashes the following commits:
    
    b064cf414 [Lee moon soo] update test
    c44c1002f [Lee moon soo] implement paragraph update short circuit in new ui
    5952cf77c [Lee moon soo] instant update on switching visualization
    c38002972 [Lee moon soo] try implement short circuit logic in new ui
    97c832e0f [Lee moon soo] skip update paragraph to READY when submit
    00135e149 [Lee moon soo] short circuit status update on paragraph run
---
 .../zeppelin/integration/ParagraphActionsIT.java   |  2 +-
 .../zeppelin/cluster/event/ClusterMessage.java     |  9 ++++
 .../org/apache/zeppelin/rest/NotebookRestApi.java  |  4 +-
 .../apache/zeppelin/socket/ConnectionManager.java  |  4 +-
 .../org/apache/zeppelin/socket/NotebookServer.java | 53 ++++++++++----------
 .../projects/zeppelin-sdk/package.json             |  2 +-
 .../interfaces/message-data-type-map.interface.ts  |  2 +
 .../src/interfaces/message-operator.interface.ts   |  8 ++++
 .../src/interfaces/message-paragraph.interface.ts  |  5 ++
 .../src/interfaces/websocket-message.interface.ts  |  1 +
 .../projects/zeppelin-sdk/src/message.ts           | 56 ++++++++++++++++++----
 .../src/app/core/paragraph-base/paragraph-base.ts  |  8 ++++
 .../app/notebook/paragraph/paragraph.controller.js |  7 +++
 .../notebook/paragraph/result/result.controller.js | 17 +++----
 .../websocket/websocket-event.factory.js           | 18 ++++++-
 .../websocket/websocket-message.service.js         | 16 +++++++
 .../org/apache/zeppelin/notebook/Paragraph.java    |  1 -
 .../apache/zeppelin/notebook/socket/Message.java   | 12 +++++
 18 files changed, 178 insertions(+), 47 deletions(-)

diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
index 13b96ae..c6a11eb 100644
--- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
+++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java
@@ -226,7 +226,7 @@ public class ParagraphActionsIT extends AbstractZeppelinIT {
       ZeppelinITUtils.sleep(2000, false);
 
       collector.checkThat("Paragraph status is ",
-          getParagraphStatus(1), CoreMatchers.equalTo("READY")
+          getParagraphStatus(1), CoreMatchers.equalTo("PENDING")
       );
 
       driver.navigate().refresh();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
index cd999c4..24e2f22 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
@@ -26,6 +26,7 @@ import java.util.Map;
 public class ClusterMessage {
   public ClusterEvent clusterEvent;
   private Map<String, String> data = new HashMap<>();
+  private String msgId;
 
   private static Gson gson = new GsonBuilder()
       .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -54,6 +55,14 @@ public class ClusterMessage {
     return data;
   }
 
+  public String getMsgId() {
+    return msgId;
+  }
+
+  public void setMsgId(String msgId) {
+    this.msgId = msgId;
+  }
+
   public static ClusterMessage deserializeMessage(String msg) {
     return gson.fromJson(msg, ClusterMessage.class);
   }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index e6dcfee..52f4acf 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -74,6 +74,8 @@ import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED;
+
 /**
  * Rest api endpoint for the notebook.
  */
@@ -574,7 +576,7 @@ public class NotebookRestApi extends AbstractRestApi {
 
     AuthenticationInfo subject = new AuthenticationInfo(user);
     notebook.saveNote(note, subject);
-    notebookServer.broadcastParagraph(note, p);
+    notebookServer.broadcastParagraph(note, p, MSG_ID_NOT_DEFINED);
     return new JsonResponse<>(Status.OK, "").build();
   }
 
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index d44c656..a36f895 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -343,7 +343,7 @@ public class ConnectionManager {
     broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
   }
 
-  public void unicastParagraph(Note note, Paragraph p, String user) {
+  public void unicastParagraph(Note note, Paragraph p, String user, String msgId) {
     if (!note.isPersonalizedMode() || p == null || user == null) {
       return;
     }
@@ -354,7 +354,7 @@ public class ConnectionManager {
     }
 
     for (NotebookSocket conn : userSocketMap.get(user)) {
-      Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p);
+      Message m = new Message(Message.OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p);
       unicast(m, conn);
     }
   }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 41f10f8..2db1d26 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -96,6 +96,8 @@ import org.glassfish.hk2.api.ServiceLocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED;
+
 /**
  * Zeppelin websocket service. This class used setter injection because all servlet should have
  * no-parameter constructor
@@ -578,7 +580,7 @@ public class NotebookServer extends WebSocketServlet
 
   public void broadcastNote(Note note) {
     inlineBroadcastNote(note);
-    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, note);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, MSG_ID_NOT_DEFINED, note);
   }
 
   private void inlineBroadcastNote(Note note) {
@@ -586,36 +588,38 @@ public class NotebookServer extends WebSocketServlet
     getConnectionManager().broadcast(note.getId(), message);
   }
 
-  private void inlineBroadcastParagraph(Note note, Paragraph p) {
+  private void inlineBroadcastParagraph(Note note, Paragraph p, String msgId) {
     broadcastNoteForms(note);
 
     if (note.isPersonalizedMode()) {
-      broadcastParagraphs(p.getUserParagraphMap(), p);
+      broadcastParagraphs(p.getUserParagraphMap(), p, msgId);
     } else {
-      Message message = new Message(OP.PARAGRAPH).put("paragraph", p);
+      Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p);
       getConnectionManager().broadcast(note.getId(), message);
     }
   }
 
-  public void broadcastParagraph(Note note, Paragraph p) {
-    inlineBroadcastParagraph(note, p);
-    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, note, p);
+  public void broadcastParagraph(Note note, Paragraph p, String msgId) {
+    inlineBroadcastParagraph(note, p, msgId);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, msgId, note, p);
   }
 
   private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap,
-                                         Paragraph defaultParagraph) {
+                                         Paragraph defaultParagraph,
+                                         String msgId) {
     if (null != userParagraphMap) {
       for (String user : userParagraphMap.keySet()) {
-        Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user));
+        Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", userParagraphMap.get(user));
         getConnectionManager().multicastToUser(user, message);
       }
     }
   }
 
   private void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
-                                  Paragraph defaultParagraph) {
-    inlineBroadcastParagraphs(userParagraphMap, defaultParagraph);
-    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, userParagraphMap, defaultParagraph);
+                                   Paragraph defaultParagraph,
+                                   String msgId) {
+    inlineBroadcastParagraphs(userParagraphMap, defaultParagraph, msgId);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, msgId, userParagraphMap, defaultParagraph);
   }
 
   private void inlineBroadcastNewParagraph(Note note, Paragraph para) {
@@ -628,7 +632,7 @@ public class NotebookServer extends WebSocketServlet
 
   private void broadcastNewParagraph(Note note, Paragraph para) {
     inlineBroadcastNewParagraph(note, para);
-    broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, note, para);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, MSG_ID_NOT_DEFINED, note, para);
   }
 
   public void inlineBroadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
@@ -647,17 +651,18 @@ public class NotebookServer extends WebSocketServlet
 
   public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
     inlineBroadcastNoteList(subject, userAndRoles);
-    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, subject, userAndRoles);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, MSG_ID_NOT_DEFINED, subject, userAndRoles);
   }
 
   // broadcast ClusterEvent
-  private void broadcastClusterEvent(ClusterEvent event, Object... objects) {
+  private void broadcastClusterEvent(ClusterEvent event, String msgId, Object... objects) {
     ZeppelinConfiguration conf = ZeppelinConfiguration.create();
     if (!conf.isClusterMode()) {
       return;
     }
 
     ClusterMessage clusterMessage = new ClusterMessage(event);
+    clusterMessage.setMsgId(msgId);
 
     for(Object object : objects) {
       String json = "";
@@ -739,10 +744,10 @@ public class NotebookServer extends WebSocketServlet
         }
         break;
       case BROADCAST_PARAGRAPH:
-        inlineBroadcastParagraph(note, paragraph);
+        inlineBroadcastParagraph(note, paragraph, message.getMsgId());
         break;
       case BROADCAST_PARAGRAPHS:
-        inlineBroadcastParagraphs(userParagraphMap, paragraph);
+        inlineBroadcastParagraphs(userParagraphMap, paragraph, message.getMsgId());
         break;
       case BROADCAST_NEW_PARAGRAPH:
         inlineBroadcastNewParagraph(note, paragraph);
@@ -1113,9 +1118,9 @@ public class NotebookServer extends WebSocketServlet
             if (p.getNote().isPersonalizedMode()) {
               Map<String, Paragraph> userParagraphMap =
                   p.getNote().getParagraph(paragraphId).getUserParagraphMap();
-              broadcastParagraphs(userParagraphMap, p);
+              broadcastParagraphs(userParagraphMap, p, fromMessage.msgId);
             } else {
-              broadcastParagraph(p.getNote(), p);
+              broadcastParagraph(p.getNote(), p, fromMessage.msgId);
             }
           }
         });
@@ -1253,9 +1258,9 @@ public class NotebookServer extends WebSocketServlet
           public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
             super.onSuccess(p, context);
             if (p.getNote().isPersonalizedMode()) {
-              getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser());
+              getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser(), fromMessage.msgId);
             } else {
-              broadcastParagraph(p.getNote(), p);
+              broadcastParagraph(p.getNote(), p, fromMessage.msgId);
             }
           }
         });
@@ -1520,7 +1525,7 @@ public class NotebookServer extends WebSocketServlet
             if (p.getNote().isPersonalizedMode()) {
               Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId,
                   context.getAutheInfo().getUser());
-              getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser());
+              getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser(), fromMessage.msgId);
             }
 
             // if it's the last paragraph and not empty, let's add a new one
@@ -1698,7 +1703,7 @@ public class NotebookServer extends WebSocketServlet
       } else {
         note.clearParagraphOutput(paragraphId);
         Paragraph paragraph = note.getParagraph(paragraphId);
-        broadcastParagraph(note, paragraph);
+        broadcastParagraph(note, paragraph, MSG_ID_NOT_DEFINED);
       }
     } catch (IOException e) {
       LOG.warn("Fail to call onOutputClear", e);
@@ -1920,7 +1925,7 @@ public class NotebookServer extends WebSocketServlet
     }
 
     p.setStatusToUserParagraph(p.getStatus());
-    broadcastParagraph(p.getNote(), p);
+    broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED);
     try {
       broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000);
     } catch (IOException e) {
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/package.json b/zeppelin-web-angular/projects/zeppelin-sdk/package.json
index 9be3b66..4bb41ea 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/package.json
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/package.json
@@ -5,4 +5,4 @@
     "@angular/common": "^8.2.9",
     "@angular/core": "^8.2.9"
   }
-}
\ No newline at end of file
+}
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
index ddf934e..fa8bddf 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts
@@ -64,6 +64,7 @@ import {
   ParagraphClearOutput,
   ParagraphRemove,
   ParagraphRemoved,
+  ParagraphStatus,
   ParasInfo,
   PatchParagraphReceived,
   PatchParagraphSend,
@@ -102,6 +103,7 @@ export interface MessageReceiveDataTypeMap {
   [OP.PARAGRAPH_REMOVED]: ParagraphRemoved;
   [OP.EDITOR_SETTING]: EditorSettingReceived;
   [OP.PROGRESS]: Progress;
+  [OP.PARAGRAPH_STATUS]: ParagraphStatus;
   [OP.PARAGRAPH_MOVED]: ParagraphMoved;
   [OP.AUTH_INFO]: AuthInfo;
   [OP.NOTE_UPDATED]: NoteUpdated;
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
index d3ce82b..1019330 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts
@@ -51,6 +51,14 @@ export enum OP {
   PROGRESS = 'PROGRESS',
 
   /**
+   * [short circuit]
+   * paragraph status update
+   *  @param id paragraph id
+   *  @param progress percentage progress
+   */
+  PARAGRAPH_STATUS = 'PARAGRAPH_STATUS',
+
+  /**
    * [c-s]
    * create new notebook
    */
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
index 2e04b22..1ec0cfb 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts
@@ -299,6 +299,11 @@ export interface Progress {
   progress: number;
 }
 
+export interface ParagraphStatus {
+  id: string;
+  status: string;
+}
+
 interface GraphConfigSetting {
   table?: VisualizationTable;
   lineChart?: VisualizationLineChart;
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
index bdc71e1..fcfd896 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts
@@ -18,4 +18,5 @@ export interface WebSocketMessage<K extends keyof MixMessageDataTypeMap> {
   ticket?: string; // default 'anonymous'
   principal?: string; // default 'anonymous'
   roles?: string; // default '[]'
+  msgId?: string;
 }
diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
index 4505e36..921876a 100644
--- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
+++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
@@ -10,20 +10,20 @@
  * limitations under the License.
  */
 
-import { interval, Observable, Subject, Subscription } from 'rxjs';
-import { delay, filter, map, mergeMap, retryWhen, take } from 'rxjs/operators';
-import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
+import {interval, Observable, Subject, Subscription} from 'rxjs';
+import {delay, filter, map, mergeMap, retryWhen, take} from 'rxjs/operators';
+import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
 
-import { Ticket } from './interfaces/message-common.interface';
+import {Ticket} from './interfaces/message-common.interface';
 import {
   MessageReceiveDataTypeMap,
   MessageSendDataTypeMap,
   MixMessageDataTypeMap
 } from './interfaces/message-data-type-map.interface';
-import { NoteConfig, PersonalizedMode, SendNote } from './interfaces/message-notebook.interface';
-import { OP } from './interfaces/message-operator.interface';
-import { ParagraphConfig, ParagraphParams, SendParagraph } from './interfaces/message-paragraph.interface';
-import { WebSocketMessage } from './interfaces/websocket-message.interface';
+import {NoteConfig, PersonalizedMode, SendNote} from './interfaces/message-notebook.interface';
+import {OP} from './interfaces/message-operator.interface';
+import {ParagraphConfig, ParagraphParams, SendParagraph} from './interfaces/message-paragraph.interface';
+import {WebSocketMessage} from './interfaces/websocket-message.interface';
 
 export type ArgumentsType<T> = T extends (...args: infer U) => void ? U : never;
 
@@ -46,6 +46,8 @@ export class Message {
   private pingIntervalSubscription = new Subscription();
   private wsUrl: string;
   private ticket: Ticket;
+  private uniqueClientId = Math.random().toString(36).substring(2, 7);
+  private lastMsgIdSeqSent = 0;
 
   constructor() {
     this.open$.subscribe(() => {
@@ -140,6 +142,7 @@ export class Message {
     const [op, data] = args;
     const message: WebSocketMessage<K> = {
       op,
+      msgId: `${this.uniqueClientId}-${++this.lastMsgIdSeqSent}`,
       data: data as MixMessageDataTypeMap[K],
       ...this.ticket
     };
@@ -152,10 +155,37 @@ export class Message {
   receive<K extends keyof MessageReceiveDataTypeMap>(op: K): Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]> {
     return this.received$.pipe(
       filter(message => message.op === op),
+      filter(message => {
+        if (!message.msgId) {
+          // when msgId is not specified, it is not response to client request.
+          // always process them
+          return true;
+        }
+        const uniqueClientId = message.msgId.split('-')[0];
+        const msgIdSeqReceived = parseInt(message.msgId.split('-')[1], 10);
+        const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId;
+
+        if (message.op === OP.PARAGRAPH) {
+          if (isResponseForRequestFromThisClient &&
+               this.lastMsgIdSeqSent > msgIdSeqReceived
+          ) {
+            console.log('PARAPGRAPH is already updated by shortcircuit');
+            return false;
+          } else {
+            return true;
+          }
+        } else {
+          return true;
+        }
+      }),
       map(message => message.data)
     ) as Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]>;
   }
 
+  shortCircuit(message: WebSocketMessage<keyof MessageReceiveDataTypeMap>) {
+    this.received$.next(this.interceptReceived(message));
+  }
+
   destroy(): void {
     this.ws.complete();
     this.ws = null;
@@ -352,6 +382,16 @@ export class Message {
     paragraphConfig: ParagraphConfig,
     paragraphParams: ParagraphParams
   ): void {
+    // short circuit update status without waiting for server response
+    this.shortCircuit({
+      op: OP.PARAGRAPH_STATUS,
+      data: {
+        id: paragraphId,
+        status: "PENDING"
+      }
+    })
+
+    // send message to server
     this.send<OP.RUN_PARAGRAPH>(OP.RUN_PARAGRAPH, {
       id: paragraphId,
       title: paragraphTitle,
diff --git a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
index f8f9a1b..7fcb35c 100644
--- a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
+++ b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts
@@ -69,6 +69,14 @@ export abstract class ParagraphBase extends MessageListenersManager {
     }
   }
 
+  @MessageListener(OP.PARAGRAPH_STATUS)
+  onParagraphStatus(data: MessageReceiveDataTypeMap[OP.PARAGRAPH_STATUS]) {
+    if (data.id === this.paragraph.id) {
+      this.paragraph.status = data.status;
+      this.cdr.markForCheck();
+    }
+  }
+
   @MessageListener(OP.NOTE_RUNNING_STATUS)
   noteRunningStatusChange(data: MessageReceiveDataTypeMap[OP.NOTE_RUNNING_STATUS]) {
     this.isEntireNoteRunning = data.status;
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index 6ccff79..21efb20 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -459,6 +459,7 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
     if (!paragraphText || $scope.isRunning($scope.paragraph)) {
       return;
     }
+
     const magic = SpellResult.extractMagic(paragraphText);
 
     if (heliumService.getSpellByMagic(magic)) {
@@ -1632,6 +1633,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat
     }
   });
 
+  $scope.$on('updateStatus', function(event, data) {
+    if (data.id === $scope.paragraph.id) {
+      $scope.paragraph.status = data.status;
+    }
+  });
+
   $scope.$on('appendParagraphOutput', function(event, data) {
     if (data.paragraphId === $scope.paragraph.id) {
       if (!$scope.paragraph.results) {
diff --git a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
index 4aabc5c..ec74cb9 100644
--- a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js
@@ -762,14 +762,15 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location
     let newParagraphConfig = angular.copy(paragraph.config);
     newParagraphConfig.results = newParagraphConfig.results || [];
     newParagraphConfig.results[resultIndex] = config;
-    if ($scope.revisionView === true) {
-      // local update without commit
-      updateData({
-        type: $scope.type,
-        data: data,
-      }, newParagraphConfig.results[resultIndex], paragraph, resultIndex);
-      renderResult($scope.type, true);
-    } else {
+
+    // local update without commit
+    updateData({
+      type: $scope.type,
+      data: data,
+    }, newParagraphConfig.results[resultIndex], paragraph, resultIndex);
+    renderResult($scope.type, true);
+
+    if ($scope.revisionView !== true) {
       if (! $scope.viewOnly) {
         return websocketMsgSrv.commitParagraph(paragraph.id, title, text, newParagraphConfig, params);
       }
diff --git a/zeppelin-web/src/components/websocket/websocket-event.factory.js b/zeppelin-web/src/components/websocket/websocket-event.factory.js
index f1b8cd6..8cbf34c 100644
--- a/zeppelin-web/src/components/websocket/websocket-event.factory.js
+++ b/zeppelin-web/src/components/websocket/websocket-event.factory.js
@@ -19,6 +19,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
 
   let websocketCalls = {};
   let pingIntervalId;
+  const uniqueClientId = Math.random().toString(36).substring(2, 7);
+  let lastMsgIdSeqSent = 0;
 
   websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl());
   websocketCalls.ws.reconnectIfNotNormalClose = true;
@@ -41,6 +43,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
       data.ticket = '';
       data.roles = '';
     }
+
+    data.msgId = uniqueClientId + '-' + ++lastMsgIdSeqSent;
     console.log('Send >> %o, %o, %o, %o, %o', data.op, data.principal, data.ticket, data.roles, data);
     return websocketCalls.ws.send(JSON.stringify(data));
   };
@@ -59,6 +63,11 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
 
     let op = payload.op;
     let data = payload.data;
+    let msgId = payload.msgId;
+    const uniqueClientId = msgId ? msgId.split('-')[0] : undefined;
+    const msgIdSeqReceived = msgId ? parseInt(msgId.split('-')[1]) : undefined;
+    const isResponseForRequestFromThisClient = uniqueClientId === uniqueClientId;
+
     if (op === 'NOTE') {
       $rootScope.$broadcast('setNoteContent', data.note);
     } else if (op === 'NEW_NOTE') {
@@ -111,7 +120,14 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa
         buttons: btn,
       });
     } else if (op === 'PARAGRAPH') {
-      $rootScope.$broadcast('updateParagraph', data);
+      if (isResponseForRequestFromThisClient &&
+          lastMsgIdSeqSent > msgIdSeqReceived
+      ) {
+        // paragraph is already updated by short circuit.
+        console.log('PARAPGRAPH is already updated by shortcircuit');
+      } else {
+        $rootScope.$broadcast('updateParagraph', data);
+      }
     } else if (op === 'PATCH_PARAGRAPH') {
       $rootScope.$broadcast('patchReceived', data);
     } else if (op === 'COLLABORATIVE_MODE_STATUS') {
diff --git a/zeppelin-web/src/components/websocket/websocket-message.service.js b/zeppelin-web/src/components/websocket/websocket-message.service.js
index a959070..ee2f773 100644
--- a/zeppelin-web/src/components/websocket/websocket-message.service.js
+++ b/zeppelin-web/src/components/websocket/websocket-message.service.js
@@ -188,6 +188,13 @@ function WebsocketMessageService($rootScope, websocketEvents) {
     },
 
     runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) {
+      // short circuit update paragraph status for immediate visual feedback without waiting for server response
+      $rootScope.$broadcast('updateStatus', {
+        id: paragraphId,
+        status: 'PENDING',
+      });
+
+      // send message to server
       websocketEvents.sendNewEvent({
         op: 'RUN_PARAGRAPH',
         data: {
@@ -201,6 +208,15 @@ function WebsocketMessageService($rootScope, websocketEvents) {
     },
 
     runAllParagraphs: function(noteId, paragraphs) {
+      // short circuit update paragraph status for immediate visual feedback without waiting for server response
+      paragraphs.forEach((p) => {
+        $rootScope.$broadcast('updateStatus', {
+          id: p.id,
+          status: 'PENDING',
+        });
+      });
+
+      // send message to server
       websocketEvents.sendNewEvent({
         op: 'RUN_ALL_PARAGRAPHS',
         data: {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index ce80523..336be44 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -338,7 +338,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
         setStatus(Job.Status.FINISHED);
         return true;
       }
-      setStatus(Status.READY);
 
       if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) {
         setAuthenticationInfo(getAuthenticationInfo());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index 2b9f339..9d6803e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -225,10 +225,22 @@ public class Message implements JsonSerializable {
   public String principal = "anonymous";
   public String roles = "";
 
+  // Unique id generated from client side. to identify message.
+  // When message from server is response to the client request
+  // includes the msgId in response message, client can pair request and response message.
+  // When server send message that is not response to the client request, set null;
+  public String msgId = MSG_ID_NOT_DEFINED;
+  public static String MSG_ID_NOT_DEFINED = null;
+
   public Message(OP op) {
     this.op = op;
   }
 
+  public Message withMsgId(String msgId) {
+    this.msgId = msgId;
+    return this;
+  }
+
   public Message put(String k, Object v) {
     data.put(k, v);
     return this;