You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/03 08:10:33 UTC

zeppelin git commit: Buffer append output results + fix extra incorrect results

Repository: zeppelin
Updated Branches:
  refs/heads/master cee58aa03 -> 11becdec7


Buffer append output results + fix extra incorrect results

### What is this PR for?
There are 2 issues and their proposed fixes:
1. On a paragraph run, for every line of output, there is a broadcast of the new line from zeppelin. In case of thousands of lines of output, the browser/s would hang because of the volume of these append-output events.
2. In the above case, besides the browser-hang, another bug observed is that result data is will repeated twice (coming from append-output calls + finish-event calls).

The proposed solution for #1 is:
- Buffer the append-output event into a queue instead of sending the event immediately.
- In a separate thread, read from the queue periodically and send the append-output event.

Solution for #2 is:
- Donot append output to result if the paragraph is not runnig.

### What type of PR is it?
Improvement + Bug Fix

### Todos

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1292

### How should this be tested?
The test could be to run a simple paragraph with large result. Eg:
```
%sh
for i in {1..10000}
do
echo $i
done
```
PS: One will need to clear browser cache between running with and without this code patch since there are javascript changes as well.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
No
* Does this needs documentation?
It could need for the design. Otherwise I have added code comments explaining behaviour.

Author: Beria <be...@qubole.com>

Closes #1283 from beriaanirudh/ZEPPELIN-1292 and squashes the following commits:

17f0524 [Beria] Use diamond operator
7852368 [Beria] nit
4b68c86 [Beria] fix checkstyle
d168614 [Beria] Remove un-necessary class CheckAppendOutputRunner
2eae38e [Beria] Make AppendOutputRunner non-static
72c316d [Beria] Scheduler service to replace while loop in AppendOutputRunner
599281f [Beria] fix unit tests that run after
dd24816 [Beria] Add license in test file
3984ef8 [Beria] fix tests when ran with other tests
1c893c0 [Beria] Add licensing
1bdd669 [Beria] fix javadoc comment
27790e4 [Beria] Avoid infinite loop in tests
5057bb3 [Beria] Incorporate feedback 1. Synchronize on AppendOutputRunner creation 2. Use ScheduledExecutorService instead of while loop 3. Remove Thread.sleep() from tests
82e9c4a [Beria] Fix comment
7020f0c [Beria] Buffer append output results + fix extra incorrect results


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/11becdec
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/11becdec
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/11becdec

Branch: refs/heads/master
Commit: 11becdec707ee22c879a40320d26149ac73cc90c
Parents: cee58aa
Author: Beria <be...@qubole.com>
Authored: Tue Aug 23 13:19:44 2016 +0530
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Sep 3 18:10:20 2016 +1000

----------------------------------------------------------------------
 .../interpreter/remote/AppendOutputBuffer.java  |  48 ++++
 .../interpreter/remote/AppendOutputRunner.java  | 118 ++++++++++
 .../remote/RemoteInterpreterEventPoller.java    |  14 +-
 .../remote/AppendOutputRunnerTest.java          | 235 +++++++++++++++++++
 .../notebook/paragraph/paragraph.controller.js  |  11 +-
 5 files changed, 424 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
new file mode 100644
index 0000000..e1484da
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * This element stores the buffered
+ * append-data of paragraph's output.
+ */
+public class AppendOutputBuffer {
+
+  private String noteId;
+  private String paragraphId;
+  private String data;
+
+  public AppendOutputBuffer(String noteId, String paragraphId, String data) {
+    this.noteId = noteId;
+    this.paragraphId = paragraphId;
+    this.data = data;
+  }
+
+  public String getNoteId() {
+    return noteId;
+  }
+
+  public String getParagraphId() {
+    return paragraphId;
+  }
+
+  public String getData() {
+    return data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
new file mode 100644
index 0000000..86ea11a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This thread sends paragraph's append-data
+ * periodically, rather than continously, with
+ * a period of BUFFER_TIME_MS. It handles append-data
+ * for all paragraphs across all notebooks.
+ */
+public class AppendOutputRunner implements Runnable {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AppendOutputRunner.class);
+  public static final Long BUFFER_TIME_MS = new Long(100);
+  private static final Long SAFE_PROCESSING_TIME = new Long(10);
+  private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
+
+  private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
+  private final RemoteInterpreterProcessListener listener;
+
+  public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void run() {
+
+    Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
+    List<AppendOutputBuffer> list = new LinkedList<>();
+
+    /* "drainTo" method does not wait for any element
+     * to be present in the queue, and thus this loop would
+     * continuosly run (with period of BUFFER_TIME_MS). "take()" method
+     * waits for the queue to become non-empty and then removes
+     * one element from it. Rest elements from queue (if present) are
+     * removed using "drainTo" method. Thus we save on some un-necessary
+     * cpu-cycles.
+     */
+    try {
+      list.add(queue.take());
+    } catch (InterruptedException e) {
+      logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
+    }
+    Long processingStartTime = System.currentTimeMillis();
+    queue.drainTo(list);
+
+    for (AppendOutputBuffer buffer: list) {
+      String noteId = buffer.getNoteId();
+      String paragraphId = buffer.getParagraphId();
+
+      Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
+          noteMap.get(noteId) : new HashMap<String, StringBuilder>();
+      StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
+          paragraphMap.get(paragraphId) : new StringBuilder();
+
+      builder.append(buffer.getData());
+      paragraphMap.put(paragraphId, builder);
+      noteMap.put(noteId, paragraphMap);
+    }
+    Long processingTime = System.currentTimeMillis() - processingStartTime;
+
+    if (processingTime > SAFE_PROCESSING_TIME) {
+      logger.warn("Processing time for buffered append-output is high: " +
+          processingTime + " milliseconds.");
+    } else {
+      logger.debug("Processing time for append-output took "
+          + processingTime + " milliseconds");
+    }
+
+    Long sizeProcessed = new Long(0);
+    for (String noteId: noteMap.keySet()) {
+      for (String paragraphId: noteMap.get(noteId).keySet()) {
+        String data = noteMap.get(noteId).get(paragraphId).toString();
+        sizeProcessed += data.length();
+        listener.onOutputAppend(noteId, paragraphId, data);
+      }
+    }
+
+    if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
+      logger.warn("Processing size for buffered append-output is high: " +
+          sizeProcessed + " characters.");
+    } else {
+      logger.debug("Processing size for append-output is " +
+          sizeProcessed + " characters");
+    }
+  }
+
+  public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
+    queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index 48c14d5..090aeea 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -39,12 +39,18 @@ import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Processes message from RemoteInterpreter process
  */
 public class RemoteInterpreterEventPoller extends Thread {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+  private static final ScheduledExecutorService appendService =
+      Executors.newSingleThreadScheduledExecutor();
   private final RemoteInterpreterProcessListener listener;
   private final ApplicationEventListener appListener;
 
@@ -72,6 +78,9 @@ public class RemoteInterpreterEventPoller extends Thread {
   @Override
   public void run() {
     Client client = null;
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
+        runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
 
     while (!shutdown) {
       // wait and retry
@@ -157,7 +166,7 @@ public class RemoteInterpreterEventPoller extends Thread {
           String appId = outputAppend.get("appId");
 
           if (appId == null) {
-            listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+            runner.appendBuffer(noteId, paragraphId, outputToAppend);
           } else {
             appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
           }
@@ -192,6 +201,9 @@ public class RemoteInterpreterEventPoller extends Thread {
         logger.error("Can't handle event " + event, e);
       }
     }
+    if (appendFuture != null) {
+      appendFuture.cancel(true);
+    }
   }
 
   private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..8e9f5b3
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+  private static final int NUM_EVENTS = 10000;
+  private static final int NUM_CLUBBED_EVENTS = 100;
+  private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+  private static ScheduledFuture<?> future = null;
+  /* It is being accessed by multiple threads.
+   * While loop for 'loopForBufferCompletion' could
+   * run for-ever.
+   */
+  private volatile static int numInvocations = 0;
+
+  @After
+  public void afterEach() {
+    if (future != null) {
+      future.cancel(true);
+    }
+  }
+
+  @Test
+  public void testSingleEvent() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String[][] buffer = {{"note", "para", "data\n"}};
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+    verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String para1 = "para1";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para1, "data2\n"},
+        {note1, para1, "data3\n"}
+    };
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String note2 = "note2";
+    String para1 = "para1";
+    String para2 = "para2";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para2, "data2\n"},
+        {note2, para1, "data3\n"},
+        {note2, para2, "data4\n"}
+    };
+    loopForCompletingEvents(listener, 4, buffer);
+
+    verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n");
+    verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
+    verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
+    verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
+  }
+
+  @Test
+  public void testClubbedData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    Thread thread = new Thread(new BombardEvents(runner));
+    thread.start();
+    thread.join();
+    Thread.sleep(1000);
+
+    /* NUM_CLUBBED_EVENTS is a heuristic number.
+     * It has been observed that for 10,000 continuos event
+     * calls, 30-40 Web-socket calls are made. Keeping
+     * the unit-test to a pessimistic 100 web-socket calls.
+     */
+    verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+  }
+
+  @Test
+  public void testWarnLoggerForLargeData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    String data = "data\n";
+    int numEvents = 100000;
+
+    for (int i=0; i<numEvents; i++) {
+      runner.appendBuffer("noteId", "paraId", data);
+    }
+
+    TestAppender appender = new TestAppender();
+    Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+    runner.run();
+    List<LoggingEvent> log;
+
+    int warnLogCounter;
+    LoggingEvent sizeWarnLogEntry = null;
+    do {
+      warnLogCounter = 0;
+      log = appender.getLog();
+      for (LoggingEvent logEntry: log) {
+        if (Level.WARN.equals(logEntry.getLevel())) {
+          sizeWarnLogEntry = logEntry;
+          warnLogCounter += 1;
+        }
+      }
+    } while(warnLogCounter != 2);
+
+    String loggerString = "Processing size for buffered append-output is high: " +
+        (data.length() * numEvents) + " characters.";
+    assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+  }
+
+  private class BombardEvents implements Runnable {
+
+    private final AppendOutputRunner runner;
+
+    private BombardEvents(AppendOutputRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public void run() {
+      String noteId = "noteId";
+      String paraId = "paraId";
+      for (int i=0; i<NUM_EVENTS; i++) {
+        runner.appendBuffer(noteId, paraId, "data\n");
+      }
+    }
+  }
+
+  private class TestAppender extends AppenderSkeleton {
+    private final List<LoggingEvent> log = new ArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+        return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+        log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+        return new ArrayList<>(log);
+    }
+  }
+
+  private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        numInvocations += 1;
+        return null;
+      }
+    }).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class));
+  }
+
+  private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+      int numTimes, String[][] buffer) {
+    numInvocations = 0;
+    prepareInvocationCounts(listener);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    for (String[] bufferElement: buffer) {
+      runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]);
+    }
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    long startTimeMs = System.currentTimeMillis();
+    while(numInvocations != numTimes) {
+      if (System.currentTimeMillis() - startTimeMs > 2000) {
+        fail("Buffered events were not sent for 2 seconds");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index 7889270..302d107 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -2575,7 +2575,16 @@ angular.module('zeppelinWebApp').controller('ParagraphCtrl', function($scope, $r
   });
 
   $scope.$on('appendParagraphOutput', function(event, data) {
-    if ($scope.paragraph.id === data.paragraphId) {
+    /* It has been observed that append events
+     * can be errorneously called even if paragraph
+     * execution has ended, and in that case, no append
+     * should be made. Also, it was observed that between PENDING
+     * and RUNNING states, append-events can be called and we can't
+     * miss those, else during the length of paragraph run, few
+     * initial output line/s will be missing.
+     */
+    if ($scope.paragraph.id === data.paragraphId &&
+       ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) {
       if ($scope.flushStreamingOutput) {
         $scope.clearTextOutput();
         $scope.flushStreamingOutput = false;