You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/03 08:10:33 UTC
zeppelin git commit: Buffer append output results + fix extra
incorrect results
Repository: zeppelin
Updated Branches:
refs/heads/master cee58aa03 -> 11becdec7
Buffer append output results + fix extra incorrect results
### What is this PR for?
There are 2 issues and their proposed fixes:
1. On a paragraph run, for every line of output, there is a broadcast of the new line from zeppelin. In case of thousands of lines of output, the browser/s would hang because of the volume of these append-output events.
2. In the above case, besides the browser-hang, another bug observed is that result data is will repeated twice (coming from append-output calls + finish-event calls).
The proposed solution for #1 is:
- Buffer the append-output event into a queue instead of sending the event immediately.
- In a separate thread, read from the queue periodically and send the append-output event.
Solution for #2 is:
- Donot append output to result if the paragraph is not runnig.
### What type of PR is it?
Improvement + Bug Fix
### Todos
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1292
### How should this be tested?
The test could be to run a simple paragraph with large result. Eg:
```
%sh
for i in {1..10000}
do
echo $i
done
```
PS: One will need to clear browser cache between running with and without this code patch since there are javascript changes as well.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
No
* Does this needs documentation?
It could need for the design. Otherwise I have added code comments explaining behaviour.
Author: Beria <be...@qubole.com>
Closes #1283 from beriaanirudh/ZEPPELIN-1292 and squashes the following commits:
17f0524 [Beria] Use diamond operator
7852368 [Beria] nit
4b68c86 [Beria] fix checkstyle
d168614 [Beria] Remove un-necessary class CheckAppendOutputRunner
2eae38e [Beria] Make AppendOutputRunner non-static
72c316d [Beria] Scheduler service to replace while loop in AppendOutputRunner
599281f [Beria] fix unit tests that run after
dd24816 [Beria] Add license in test file
3984ef8 [Beria] fix tests when ran with other tests
1c893c0 [Beria] Add licensing
1bdd669 [Beria] fix javadoc comment
27790e4 [Beria] Avoid infinite loop in tests
5057bb3 [Beria] Incorporate feedback 1. Synchronize on AppendOutputRunner creation 2. Use ScheduledExecutorService instead of while loop 3. Remove Thread.sleep() from tests
82e9c4a [Beria] Fix comment
7020f0c [Beria] Buffer append output results + fix extra incorrect results
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/11becdec
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/11becdec
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/11becdec
Branch: refs/heads/master
Commit: 11becdec707ee22c879a40320d26149ac73cc90c
Parents: cee58aa
Author: Beria <be...@qubole.com>
Authored: Tue Aug 23 13:19:44 2016 +0530
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Sep 3 18:10:20 2016 +1000
----------------------------------------------------------------------
.../interpreter/remote/AppendOutputBuffer.java | 48 ++++
.../interpreter/remote/AppendOutputRunner.java | 118 ++++++++++
.../remote/RemoteInterpreterEventPoller.java | 14 +-
.../remote/AppendOutputRunnerTest.java | 235 +++++++++++++++++++
.../notebook/paragraph/paragraph.controller.js | 11 +-
5 files changed, 424 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
new file mode 100644
index 0000000..e1484da
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * This element stores the buffered
+ * append-data of paragraph's output.
+ */
+public class AppendOutputBuffer {
+
+ private String noteId;
+ private String paragraphId;
+ private String data;
+
+ public AppendOutputBuffer(String noteId, String paragraphId, String data) {
+ this.noteId = noteId;
+ this.paragraphId = paragraphId;
+ this.data = data;
+ }
+
+ public String getNoteId() {
+ return noteId;
+ }
+
+ public String getParagraphId() {
+ return paragraphId;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
new file mode 100644
index 0000000..86ea11a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This thread sends paragraph's append-data
+ * periodically, rather than continously, with
+ * a period of BUFFER_TIME_MS. It handles append-data
+ * for all paragraphs across all notebooks.
+ */
+public class AppendOutputRunner implements Runnable {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(AppendOutputRunner.class);
+ public static final Long BUFFER_TIME_MS = new Long(100);
+ private static final Long SAFE_PROCESSING_TIME = new Long(10);
+ private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
+
+ private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
+ private final RemoteInterpreterProcessListener listener;
+
+ public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void run() {
+
+ Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
+ List<AppendOutputBuffer> list = new LinkedList<>();
+
+ /* "drainTo" method does not wait for any element
+ * to be present in the queue, and thus this loop would
+ * continuosly run (with period of BUFFER_TIME_MS). "take()" method
+ * waits for the queue to become non-empty and then removes
+ * one element from it. Rest elements from queue (if present) are
+ * removed using "drainTo" method. Thus we save on some un-necessary
+ * cpu-cycles.
+ */
+ try {
+ list.add(queue.take());
+ } catch (InterruptedException e) {
+ logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
+ }
+ Long processingStartTime = System.currentTimeMillis();
+ queue.drainTo(list);
+
+ for (AppendOutputBuffer buffer: list) {
+ String noteId = buffer.getNoteId();
+ String paragraphId = buffer.getParagraphId();
+
+ Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
+ noteMap.get(noteId) : new HashMap<String, StringBuilder>();
+ StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
+ paragraphMap.get(paragraphId) : new StringBuilder();
+
+ builder.append(buffer.getData());
+ paragraphMap.put(paragraphId, builder);
+ noteMap.put(noteId, paragraphMap);
+ }
+ Long processingTime = System.currentTimeMillis() - processingStartTime;
+
+ if (processingTime > SAFE_PROCESSING_TIME) {
+ logger.warn("Processing time for buffered append-output is high: " +
+ processingTime + " milliseconds.");
+ } else {
+ logger.debug("Processing time for append-output took "
+ + processingTime + " milliseconds");
+ }
+
+ Long sizeProcessed = new Long(0);
+ for (String noteId: noteMap.keySet()) {
+ for (String paragraphId: noteMap.get(noteId).keySet()) {
+ String data = noteMap.get(noteId).get(paragraphId).toString();
+ sizeProcessed += data.length();
+ listener.onOutputAppend(noteId, paragraphId, data);
+ }
+ }
+
+ if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
+ logger.warn("Processing size for buffered append-output is high: " +
+ sizeProcessed + " characters.");
+ } else {
+ logger.debug("Processing size for append-output is " +
+ sizeProcessed + " characters");
+ }
+ }
+
+ public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
+ queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index 48c14d5..090aeea 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -39,12 +39,18 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* Processes message from RemoteInterpreter process
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+ private static final ScheduledExecutorService appendService =
+ Executors.newSingleThreadScheduledExecutor();
private final RemoteInterpreterProcessListener listener;
private final ApplicationEventListener appListener;
@@ -72,6 +78,9 @@ public class RemoteInterpreterEventPoller extends Thread {
@Override
public void run() {
Client client = null;
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
+ runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
while (!shutdown) {
// wait and retry
@@ -157,7 +166,7 @@ public class RemoteInterpreterEventPoller extends Thread {
String appId = outputAppend.get("appId");
if (appId == null) {
- listener.onOutputAppend(noteId, paragraphId, outputToAppend);
+ runner.appendBuffer(noteId, paragraphId, outputToAppend);
} else {
appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend);
}
@@ -192,6 +201,9 @@ public class RemoteInterpreterEventPoller extends Thread {
logger.error("Can't handle event " + event, e);
}
}
+ if (appendFuture != null) {
+ appendFuture.cancel(true);
+ }
}
private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..8e9f5b3
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+ private static final int NUM_EVENTS = 10000;
+ private static final int NUM_CLUBBED_EVENTS = 100;
+ private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ private static ScheduledFuture<?> future = null;
+ /* It is being accessed by multiple threads.
+ * While loop for 'loopForBufferCompletion' could
+ * run for-ever.
+ */
+ private volatile static int numInvocations = 0;
+
+ @After
+ public void afterEach() {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ @Test
+ public void testSingleEvent() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String[][] buffer = {{"note", "para", "data\n"}};
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+ verify(listener, times(1)).onOutputAppend("note", "para", "data\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String para1 = "para1";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para1, "data2\n"},
+ {note1, para1, "data3\n"}
+ };
+
+ loopForCompletingEvents(listener, 1, buffer);
+ verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, "data1\ndata2\ndata3\n");
+ }
+
+ @Test
+ public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ String note1 = "note1";
+ String note2 = "note2";
+ String para1 = "para1";
+ String para2 = "para2";
+ String[][] buffer = {
+ {note1, para1, "data1\n"},
+ {note1, para2, "data2\n"},
+ {note2, para1, "data3\n"},
+ {note2, para2, "data4\n"}
+ };
+ loopForCompletingEvents(listener, 4, buffer);
+
+ verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+ verify(listener, times(1)).onOutputAppend(note1, para1, "data1\n");
+ verify(listener, times(1)).onOutputAppend(note1, para2, "data2\n");
+ verify(listener, times(1)).onOutputAppend(note2, para1, "data3\n");
+ verify(listener, times(1)).onOutputAppend(note2, para2, "data4\n");
+ }
+
+ @Test
+ public void testClubbedData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ Thread thread = new Thread(new BombardEvents(runner));
+ thread.start();
+ thread.join();
+ Thread.sleep(1000);
+
+ /* NUM_CLUBBED_EVENTS is a heuristic number.
+ * It has been observed that for 10,000 continuos event
+ * calls, 30-40 Web-socket calls are made. Keeping
+ * the unit-test to a pessimistic 100 web-socket calls.
+ */
+ verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), any(String.class));
+ }
+
+ @Test
+ public void testWarnLoggerForLargeData() throws InterruptedException {
+ RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ String data = "data\n";
+ int numEvents = 100000;
+
+ for (int i=0; i<numEvents; i++) {
+ runner.appendBuffer("noteId", "paraId", data);
+ }
+
+ TestAppender appender = new TestAppender();
+ Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+ Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+ runner.run();
+ List<LoggingEvent> log;
+
+ int warnLogCounter;
+ LoggingEvent sizeWarnLogEntry = null;
+ do {
+ warnLogCounter = 0;
+ log = appender.getLog();
+ for (LoggingEvent logEntry: log) {
+ if (Level.WARN.equals(logEntry.getLevel())) {
+ sizeWarnLogEntry = logEntry;
+ warnLogCounter += 1;
+ }
+ }
+ } while(warnLogCounter != 2);
+
+ String loggerString = "Processing size for buffered append-output is high: " +
+ (data.length() * numEvents) + " characters.";
+ assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+ }
+
+ private class BombardEvents implements Runnable {
+
+ private final AppendOutputRunner runner;
+
+ private BombardEvents(AppendOutputRunner runner) {
+ this.runner = runner;
+ }
+
+ @Override
+ public void run() {
+ String noteId = "noteId";
+ String paraId = "paraId";
+ for (int i=0; i<NUM_EVENTS; i++) {
+ runner.appendBuffer(noteId, paraId, "data\n");
+ }
+ }
+ }
+
+ private class TestAppender extends AppenderSkeleton {
+ private final List<LoggingEvent> log = new ArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<>(log);
+ }
+ }
+
+ private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ numInvocations += 1;
+ return null;
+ }
+ }).when(listener).onOutputAppend(any(String.class), any(String.class), any(String.class));
+ }
+
+ private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+ int numTimes, String[][] buffer) {
+ numInvocations = 0;
+ prepareInvocationCounts(listener);
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ for (String[] bufferElement: buffer) {
+ runner.appendBuffer(bufferElement[0], bufferElement[1], bufferElement[2]);
+ }
+ future = service.scheduleWithFixedDelay(runner, 0,
+ AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+ long startTimeMs = System.currentTimeMillis();
+ while(numInvocations != numTimes) {
+ if (System.currentTimeMillis() - startTimeMs > 2000) {
+ fail("Buffered events were not sent for 2 seconds");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/11becdec/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
index 7889270..302d107 100644
--- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
+++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js
@@ -2575,7 +2575,16 @@ angular.module('zeppelinWebApp').controller('ParagraphCtrl', function($scope, $r
});
$scope.$on('appendParagraphOutput', function(event, data) {
- if ($scope.paragraph.id === data.paragraphId) {
+ /* It has been observed that append events
+ * can be errorneously called even if paragraph
+ * execution has ended, and in that case, no append
+ * should be made. Also, it was observed that between PENDING
+ * and RUNNING states, append-events can be called and we can't
+ * miss those, else during the length of paragraph run, few
+ * initial output line/s will be missing.
+ */
+ if ($scope.paragraph.id === data.paragraphId &&
+ ($scope.paragraph.status === 'RUNNING' || $scope.paragraph.status === 'PENDING')) {
if ($scope.flushStreamingOutput) {
$scope.clearTextOutput();
$scope.flushStreamingOutput = false;