You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/03/17 03:22:38 UTC

[08/23] zeppelin git commit: [HOTFIX][ZEPPELIN-2178] Prevent from cleaning output in "Personalized Mode"

[HOTFIX][ZEPPELIN-2178] Prevent from cleaning output in "Personalized Mode"


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/42385220
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/42385220
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/42385220

Branch: refs/heads/branch-0.7
Commit: 42385220e6d43e7757429f4a10df2a2e6fcadbe4
Parents: 11e897d
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Tue Mar 7 14:46:33 2017 +0900
Committer: Jongyoul Lee <jo...@gmail.com>
Committed: Tue Mar 7 14:47:59 2017 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/socket/NotebookServer.java  |  34 +++++-
 .../java/org/apache/zeppelin/notebook/Note.java |   9 ++
 .../org/apache/zeppelin/notebook/Paragraph.java |  32 ++++--
 .../apache/zeppelin/notebook/ParagraphTest.java | 107 +++++++++++++++++++
 4 files changed, 172 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index e2ffa0a..6791b63 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -1127,6 +1127,17 @@ public class NotebookServer extends WebSocketServlet
     p.setConfig(config);
     p.setTitle((String) fromMessage.get("title"));
     p.setText((String) fromMessage.get("paragraph"));
+
+    subject = new AuthenticationInfo(fromMessage.principal);
+    if (note.isPersonalizedMode()) {
+      p = p.getUserParagraph(subject.getUser());
+      p.settings.setParams(params);
+      p.setConfig(config);
+      p.setTitle((String) fromMessage.get("title"));
+      p.setText((String) fromMessage.get("paragraph"));
+    }
+
+
     note.persist(subject);
 
     if (note.isPersonalizedMode()) {
@@ -1647,6 +1658,15 @@ public class NotebookServer extends WebSocketServlet
     p.settings.setParams(params);
     p.setConfig(config);
 
+    if (note.isPersonalizedMode()) {
+      p = note.getParagraph(paragraphId);
+      p.setText(text);
+      p.setTitle(title);
+      p.setAuthenticationInfo(subject);
+      p.settings.setParams(params);
+      p.setConfig(config);
+    }
+
     return p;
   }
 
@@ -1767,7 +1787,15 @@ public class NotebookServer extends WebSocketServlet
       InterpreterResult.Type type, String output) {
     Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId)
         .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output);
-    broadcast(noteId, msg);
+    Note note = notebook().getNote(noteId);
+    if (note.isPersonalizedMode()) {
+      String user = note.getParagraph(paragraphId).getUser();
+      if (null != user) {
+        multicastToUser(user, msg);
+      }
+    } else {
+      broadcast(noteId, msg);
+    }
   }
 
 
@@ -2036,7 +2064,9 @@ public class NotebookServer extends WebSocketServlet
         }
       }
       if (job instanceof Paragraph) {
-        notebookServer.broadcastParagraph(note, (Paragraph) job);
+        Paragraph p = (Paragraph) job;
+        p.setStatusToUserParagraph(job.getStatus());
+        notebookServer.broadcastParagraph(note, p);
       }
       try {
         notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 35f32f3..f341e16 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -137,6 +137,15 @@ public class Note implements Serializable, ParagraphJobListener {
       valueString = "false";
     }
     getConfig().put("personalizedMode", valueString);
+    clearUserParagraphs(value);
+  }
+
+  private void clearUserParagraphs(boolean isPersonalized) {
+    if (!isPersonalized) {
+      for (Paragraph p : paragraphs) {
+        p.clearUserParagraphs();
+      }
+    }
   }
 
   public String getId() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index f609ecb..cb6e0c7 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Paragraph is a representation of an execution unit.
  */
 public class Paragraph extends Job implements Serializable, Cloneable {
+
   private static final long serialVersionUID = -6328572073497992016L;
 
   private static Logger logger = LoggerFactory.getLogger(Paragraph.class);
@@ -123,6 +124,9 @@ public class Paragraph extends Job implements Serializable, Cloneable {
   }
 
   public Paragraph getUserParagraph(String user) {
+    if (!userParagraphMap.containsKey(user)) {
+      cloneParagraphForUser(user);
+    }
     return userParagraphMap.get(user);
   }
 
@@ -139,12 +143,16 @@ public class Paragraph extends Job implements Serializable, Cloneable {
     p.setTitle(getTitle());
     p.setText(getText());
     p.setResult(getReturn());
-    p.setStatus(getStatus());
+    p.setStatus(Status.READY);
     p.setId(getId());
     addUser(p, user);
     return p;
   }
 
+  public void clearUserParagraphs() {
+    userParagraphMap.clear();
+  }
+
   public void addUser(Paragraph p, String user) {
     userParagraphMap.put(user, p);
   }
@@ -370,6 +378,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
       }
     }
 
+    for (Paragraph p : userParagraphMap.values()) {
+      p.setText(getText());
+    }
+
     String script = getScriptBody();
     // inject form
     if (repl.getFormType() == FormType.NATIVE) {
@@ -401,13 +413,9 @@ public class Paragraph extends Job implements Serializable, Cloneable {
       List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
       resultMessages.addAll(ret.message());
 
-      for (Paragraph p : userParagraphMap.values()) {
-        p.setText(getText());
-      }
-
       InterpreterResult res = new InterpreterResult(ret.code(), resultMessages);
 
-      Paragraph p = userParagraphMap.get(getUser());
+      Paragraph p = getUserParagraph(getUser());
       if (null != p) {
         p.setResult(res);
         p.settings.setParams(settings.getParams());
@@ -526,12 +534,12 @@ public class Paragraph extends Job implements Serializable, Cloneable {
     Credentials credentials = note.getCredentials();
     if (authenticationInfo != null) {
       UserCredentials userCredentials =
-              credentials.getUserCredentials(authenticationInfo.getUser());
+          credentials.getUserCredentials(authenticationInfo.getUser());
       authenticationInfo.setUserCredentials(userCredentials);
     }
 
     InterpreterContext interpreterContext =
-            new InterpreterContext(note.getId(), getId(), getRequiredReplName(), this.getTitle(),
+        new InterpreterContext(note.getId(), getId(), getRequiredReplName(), this.getTitle(),
             this.getText(), this.getAuthenticationInfo(), this.getConfig(), this.settings, registry,
             resourcePool, runners, output);
     return interpreterContext;
@@ -574,7 +582,15 @@ public class Paragraph extends Job implements Serializable, Cloneable {
     return new ParagraphRunner(note, note.getId(), getId());
   }
 
+  public void setStatusToUserParagraph(Status status) {
+    String user = getUser();
+    if (null != user) {
+      getUserParagraph(getUser()).setStatus(status);
+    }
+  }
+
   static class ParagraphRunner extends InterpreterContextRunner {
+
     private transient Note note;
 
     public ParagraphRunner(Note note, String noteId, String paragraphId) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/42385220/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
index 69577e9..0e77846 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
@@ -19,22 +19,48 @@ package org.apache.zeppelin.notebook;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
+import java.util.List;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectBuilder;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSetting.Status;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.user.Credentials;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 public class ParagraphTest {
   @Test
@@ -125,4 +151,85 @@ public class ParagraphTest {
     verify(registry).get("age", noteId, null);
     assertEquals(actual, expected);
   }
+
+  @Test
+  public void returnDefaultParagraphWithNewUser() {
+    Paragraph p = new Paragraph("para_1", null, null, null, null);
+    Object defaultValue = "Default Value";
+    p.setResult(defaultValue);
+    Paragraph newUserParagraph = p.getUserParagraph("new_user");
+    assertNotNull(newUserParagraph);
+    assertEquals(defaultValue, newUserParagraph.getReturn());
+  }
+
+  @Test
+  public void returnUnchangedResultsWithDifferentUser() throws Throwable {
+    InterpreterSettingManager mockInterpreterSettingManager = mock(InterpreterSettingManager.class);
+    Note mockNote = mock(Note.class);
+    when(mockNote.getCredentials()).thenReturn(mock(Credentials.class));
+    Paragraph spyParagraph = spy(new Paragraph("para_1", mockNote,  null, null, mockInterpreterSettingManager));
+
+    doReturn("spy").when(spyParagraph).getRequiredReplName();
+
+
+    Interpreter mockInterpreter = mock(Interpreter.class);
+    doReturn(mockInterpreter).when(spyParagraph).getRepl(anyString());
+
+    InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
+    when(mockInterpreter.getInterpreterGroup()).thenReturn(mockInterpreterGroup);
+    when(mockInterpreterGroup.getId()).thenReturn("mock_id_1");
+    when(mockInterpreterGroup.getAngularObjectRegistry()).thenReturn(mock(AngularObjectRegistry.class));
+    when(mockInterpreterGroup.getResourcePool()).thenReturn(mock(ResourcePool.class));
+
+    List<InterpreterSetting> spyInterpreterSettingList = spy(Lists.<InterpreterSetting>newArrayList());
+    InterpreterSetting mockInterpreterSetting = mock(InterpreterSetting.class);
+    InterpreterOption mockInterpreterOption = mock(InterpreterOption.class);
+    when(mockInterpreterSetting.getOption()).thenReturn(mockInterpreterOption);
+    when(mockInterpreterOption.permissionIsSet()).thenReturn(false);
+    when(mockInterpreterSetting.getStatus()).thenReturn(Status.READY);
+    when(mockInterpreterSetting.getId()).thenReturn("mock_id_1");
+    when(mockInterpreterSetting.getInterpreterGroup(anyString(), anyString())).thenReturn(mockInterpreterGroup);
+    spyInterpreterSettingList.add(mockInterpreterSetting);
+    when(mockNote.getId()).thenReturn("any_id");
+    when(mockInterpreterSettingManager.getInterpreterSettings(anyString())).thenReturn(spyInterpreterSettingList);
+
+    doReturn("spy script body").when(spyParagraph).getScriptBody();
+
+    when(mockInterpreter.getFormType()).thenReturn(FormType.NONE);
+
+    ParagraphJobListener mockJobListener = mock(ParagraphJobListener.class);
+    doReturn(mockJobListener).when(spyParagraph).getListener();
+    doNothing().when(mockJobListener).onOutputUpdateAll(Mockito.<Paragraph>any(), Mockito.anyList());
+
+    InterpreterResult mockInterpreterResult = mock(InterpreterResult.class);
+    when(mockInterpreter.interpret(anyString(), Mockito.<InterpreterContext>any())).thenReturn(mockInterpreterResult);
+    when(mockInterpreterResult.code()).thenReturn(Code.SUCCESS);
+
+
+    // Actual test
+    List<InterpreterResultMessage> result1 = Lists.newArrayList();
+    result1.add(new InterpreterResultMessage(Type.TEXT, "result1"));
+    when(mockInterpreterResult.message()).thenReturn(result1);
+
+    AuthenticationInfo user1 = new AuthenticationInfo("user1");
+    spyParagraph.setAuthenticationInfo(user1);
+    spyParagraph.jobRun();
+    Paragraph p1 = spyParagraph.getUserParagraph(user1.getUser());
+
+    List<InterpreterResultMessage> result2 = Lists.newArrayList();
+    result2.add(new InterpreterResultMessage(Type.TEXT, "result2"));
+    when(mockInterpreterResult.message()).thenReturn(result2);
+
+    AuthenticationInfo user2 = new AuthenticationInfo("user2");
+    spyParagraph.setAuthenticationInfo(user2);
+    spyParagraph.jobRun();
+    Paragraph p2 = spyParagraph.getUserParagraph(user2.getUser());
+
+    assertNotEquals(p1.getReturn().toString(), p2.getReturn().toString());
+
+    assertEquals(p1, spyParagraph.getUserParagraph(user1.getUser()));
+
+
+
+  }
 }