You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/02/18 02:08:40 UTC

[zeppelin] branch master updated: [ZEPPELIN-4617]. Clean ParagraphJobListener

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6878989  [ZEPPELIN-4617]. Clean ParagraphJobListener
6878989 is described below

commit 6878989c754f1fb982a1ec13d5677113f87e8520
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Feb 14 17:25:12 2020 +0800

    [ZEPPELIN-4617]. Clean ParagraphJobListener
    
    ### What is this PR for?
    The following methods in ParagraphJobListener is not used any more (they are legacy methods which is used when interpreter is running in the same process of zeppelin server, and this is not allowed any more, instead we always launch separated interpreter process)
    * onOutputAppend
    * onOutputUpdate
    * onOutputUpdateAll
    
    ### What type of PR is it?
    [ Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4617
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3651 from zjffdu/ZEPPELIN-4617 and squashes the following commits:
    
    db08b07f8 [Jeff Zhang] [ZEPPELIN-4617]. Clean ParagraphJobListener
---
 .../org/apache/zeppelin/socket/NotebookServer.java | 28 ----------
 .../remote/RemoteInterpreterProcessListener.java   | 63 ++++++++++++++++++---
 .../org/apache/zeppelin/notebook/Paragraph.java    | 64 ++--------------------
 .../zeppelin/notebook/ParagraphJobListener.java    | 10 +---
 .../org/apache/zeppelin/notebook/NotebookTest.java | 17 ------
 .../apache/zeppelin/notebook/ParagraphTest.java    |  2 -
 6 files changed, 62 insertions(+), 122 deletions(-)

diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index c16b912..9b4bc53 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -1908,34 +1908,6 @@ public class NotebookServer extends WebSocketServlet
     }
   }
 
-
-  /**
-   * This callback is for paragraph that runs on RemoteInterpreterProcess.
-   */
-  @Override
-  public void onOutputAppend(Paragraph paragraph, int idx, String output) {
-    Message msg =
-        new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", paragraph.getNote().getId())
-            .put("paragraphId", paragraph.getId()).put("data", output);
-    connectionManager.broadcast(paragraph.getNote().getId(), msg);
-  }
-
-  /**
-   * This callback is for paragraph that runs on RemoteInterpreterProcess.
-   */
-  @Override
-  public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage result) {
-    Message msg =
-        new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", paragraph.getNote().getId())
-            .put("paragraphId", paragraph.getId()).put("data", result.getData());
-    connectionManager.broadcast(paragraph.getNote().getId(), msg);
-  }
-
-  @Override
-  public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
-    // TODO
-  }
-
   @Override
   public void checkpointOutput(String noteId, String paragraphId) {
     try {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
index c9c7913..e6876c7 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -19,29 +19,78 @@ package org.apache.zeppelin.interpreter.remote;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
-import org.apache.zeppelin.interpreter.thrift.ServiceException;
-import org.apache.zeppelin.user.AuthenticationInfo;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Event from remoteInterpreterProcess
+ * Listener for events from RemoteInterpreterProcess.
  */
 public interface RemoteInterpreterProcessListener {
-  public void onOutputAppend(String noteId, String paragraphId, int index, String output);
-  public void onOutputUpdated(
+  /**
+   * Invoked when output is appended.
+   * @param noteId
+   * @param paragraphId
+   * @param index
+   * @param output
+   */
+  void onOutputAppend(String noteId, String paragraphId, int index, String output);
+
+  /**
+   * Invoked when the whole output is updated
+   * @param noteId
+   * @param paragraphId
+   * @param index
+   * @param type
+   * @param output
+   */
+  void onOutputUpdated(
       String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
-  public void onOutputClear(String noteId, String paragraphId);
+
+  /**
+   * Invoked when output is cleared.
+   * @param noteId
+   * @param paragraphId
+   */
+  void onOutputClear(String noteId, String paragraphId);
+
+  /**
+   * Run paragraphs, paragraphs can be specified via indices(paragraphIndices) or ids(paragraphIds)
+   * @param noteId
+   * @param paragraphIndices
+   * @param paragraphIds
+   * @param curParagraphId
+   * @throws IOException
+   */
   void runParagraphs(String noteId, List<Integer> paragraphIndices, List<String> paragraphIds,
                      String curParagraphId)
       throws IOException;
 
-  public void onParaInfosReceived(String noteId, String paragraphId,
+  /**
+   * Invoked when paragraph runtime info is received, such as spark job info.
+   * @param noteId
+   * @param paragraphId
+   * @param interpreterSettingId
+   * @param metaInfos
+   */
+  void onParaInfosReceived(String noteId, String paragraphId,
                                   String interpreterSettingId, Map<String, String> metaInfos);
 
+  /**
+   * Invoked for getting paragraph infos.
+   * @param user
+   * @param noteId
+   * @return
+   * @throws TException
+   * @throws IOException
+   */
   List<ParagraphInfo> getParagraphList(String user, String noteId) throws TException, IOException;
 
+  /**
+   * Invoked for checkpoint partial paragraph output.
+   * @param noteId
+   * @param paragraphId
+   */
   void checkpointOutput(String noteId, String paragraphId);
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 803e600..d483f6a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -292,7 +292,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
       return new ArrayList<>();
     }
     cursor = calculateCursorPosition(buffer, cursor);
-    InterpreterContext interpreterContext = getInterpreterContext(null);
+    InterpreterContext interpreterContext = getInterpreterContext();
 
     try {
       return this.interpreter.completion(this.scriptText, cursor, interpreterContext);
@@ -323,7 +323,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   public int progress() {
     try {
       if (this.interpreter != null) {
-        return this.interpreter.getProgress(getInterpreterContext(null));
+        return this.interpreter.getProgress(getInterpreterContext());
       } else {
         return 0;
       }
@@ -486,17 +486,13 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
           return getReturn();
         }
 
-        context.out.flush();
-        List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-        resultMessages.addAll(ret.message());
-        InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
         Paragraph p = getUserParagraph(getUser());
         if (null != p) {
-          p.setResult(res);
+          p.setResult(ret);
           p.settings.setParams(settings.getParams());
         }
 
-        return res;
+        return ret;
       } finally {
         InterpreterContext.remove();
       }
@@ -511,7 +507,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
       return true;
     }
     try {
-      interpreter.cancel(getInterpreterContext(null));
+      interpreter.cancel(getInterpreterContext());
     } catch (InterpreterException e) {
       throw new RuntimeException(e);
     }
@@ -520,55 +516,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
   }
 
   private InterpreterContext getInterpreterContext() {
-    final Paragraph self = this;
-
-    return getInterpreterContext(
-        new InterpreterOutput(
-            new InterpreterOutputListener() {
-              ParagraphJobListener paragraphJobListener = (ParagraphJobListener) getListener();
-
-              @Override
-              public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-                if (null != paragraphJobListener) {
-                  paragraphJobListener.onOutputAppend(self, index, new String(line));
-                }
-              }
-
-              @Override
-              public void onUpdate(int index, InterpreterResultMessageOutput out) {
-                try {
-                  if (null != paragraphJobListener) {
-                    paragraphJobListener.onOutputUpdate(
-                        self, index, out.toInterpreterResultMessage());
-                  }
-                } catch (IOException e) {
-                  LOGGER.error(e.getMessage(), e);
-                }
-              }
-
-              @Override
-              public void onUpdateAll(InterpreterOutput out) {
-                try {
-                  List<InterpreterResultMessage> messages = out.toInterpreterResultMessage();
-                  if (null != paragraphJobListener) {
-                    paragraphJobListener.onOutputUpdateAll(self, messages);
-                  }
-                  updateParagraphResult(messages);
-                  outputBuffer.clear();
-                } catch (IOException e) {
-                  LOGGER.error(e.getMessage(), e);
-                }
-              }
-
-      private void updateParagraphResult(List<InterpreterResultMessage> msgs) {
-        // update paragraph results
-        InterpreterResult result = new InterpreterResult(Code.SUCCESS, msgs);
-        setReturn(result, null);
-      }
-    }));
-  }
-
-  private InterpreterContext getInterpreterContext(InterpreterOutput output) {
     AngularObjectRegistry registry = null;
     ResourcePool resourcePool = null;
 
@@ -599,7 +546,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
             .setNoteGUI(getNoteGui())
             .setAngularObjectRegistry(registry)
             .setResourcePool(resourcePool)
-            .setInterpreterOut(output)
             .build();
     return interpreterContext;
   }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
index 1d49eff..d0c99f3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
@@ -17,21 +17,13 @@
 
 package org.apache.zeppelin.notebook;
 
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.scheduler.JobListener;
 
-import java.util.List;
 
 /**
- * Listen paragraph update
+ * Listener for Paragraph Job.
  */
 public interface ParagraphJobListener extends JobListener<Paragraph> {
-  void onOutputAppend(Paragraph paragraph, int idx, String output);
-  void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg);
-  void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs);
-
   //TODO(savalek) Temporary solution. Need to refactor cron to be able to notify frontend directly.
   void noteRunningStatusChange(String noteId, boolean newStatus);
 }
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 2cc9ba8..b5f8866 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -1550,23 +1550,6 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     }
   }
 
-
-
-  @Override
-  public void onOutputAppend(Paragraph paragraph, int idx, String output) {
-
-  }
-
-  @Override
-  public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg) {
-
-  }
-
-  @Override
-  public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
-
-  }
-
   @Override
   public void noteRunningStatusChange(String noteId, boolean newStatus) {
 
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
index f281b34..03f428e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
@@ -292,7 +292,6 @@ public class ParagraphTest extends AbstractInterpreterTest {
 
     ParagraphJobListener mockJobListener = mock(ParagraphJobListener.class);
     doReturn(mockJobListener).when(spyParagraph).getListener();
-    doNothing().when(mockJobListener).onOutputUpdateAll(Mockito.<Paragraph>any(), Mockito.anyList());
 
     InterpreterResult mockInterpreterResult = mock(InterpreterResult.class);
     when(mockInterpreter.interpret(anyString(), Mockito.<InterpreterContext>any())).thenReturn(mockInterpreterResult);
@@ -380,7 +379,6 @@ public class ParagraphTest extends AbstractInterpreterTest {
 
     ParagraphJobListener mockJobListener = mock(ParagraphJobListener.class);
     doReturn(mockJobListener).when(spyParagraph).getListener();
-    doNothing().when(mockJobListener).onOutputUpdateAll(Mockito.<Paragraph>any(), Mockito.anyList());
 
     InterpreterResult mockInterpreterResult = mock(InterpreterResult.class);
     when(mockInterpreter.interpret(anyString(), Mockito.<InterpreterContext>any())).thenReturn(mockInterpreterResult);