You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/15 05:27:43 UTC

zeppelin git commit: ZEPPELIN-3330. Add more test for RemoteInterpreterServer

Repository: zeppelin
Updated Branches:
  refs/heads/master 6001b98de -> 5499fc4e9


ZEPPELIN-3330. Add more test for RemoteInterpreterServer

### What is this PR for?
This PR add more test for RemoteInterpreterServer for the purpose of further changes on RemoveInterpreterServer. Besides, it did some code refactoring here.

### What type of PR is it?
[ Improvement ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3330

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2869 from zjffdu/ZEPPELIN-3330 and squashes the following commits:

8e8c219 [Jeff Zhang] ZEPPELIN-3330. Add more test for RemoteInterpreterServer


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5499fc4e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5499fc4e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5499fc4e

Branch: refs/heads/master
Commit: 5499fc4e90b44269bd5e4c4a94f3dd95b6c6fedc
Parents: 6001b98
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Mar 14 16:05:50 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Mar 15 13:23:23 2018 +0800

----------------------------------------------------------------------
 .../interpreter/BaseZeppelinContext.java        |   6 +-
 .../interpreter/InterpreterContext.java         |  10 +-
 .../remote/RemoteInterpreterServer.java         |  29 +-
 .../remote/RemoteInterpreterServerTest.java     | 284 ++++++++++++++-----
 4 files changed, 234 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index e38a29f..2e9a9de 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -772,7 +772,7 @@ public abstract class BaseZeppelinContext {
    */
   @Experimental
   public void registerHook(String event, String cmd) {
-    String className = interpreterContext.getClassName();
+    String className = interpreterContext.getInterpreterClassName();
     registerHook(event, cmd, className);
   }
 
@@ -794,7 +794,7 @@ public abstract class BaseZeppelinContext {
    */
   @Experimental
   public String getHook(String event) {
-    String className = interpreterContext.getClassName();
+    String className = interpreterContext.getInterpreterClassName();
     return getHook(event, className);
   }
 
@@ -816,7 +816,7 @@ public abstract class BaseZeppelinContext {
    */
   @Experimental
   public void unregisterHook(String event) {
-    String className = interpreterContext.getClassName();
+    String className = interpreterContext.getInterpreterClassName();
     unregisterHook(event, className);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 8fa0904..6157d69 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -62,7 +62,7 @@ public class InterpreterContext {
   private AngularObjectRegistry angularObjectRegistry;
   private ResourcePool resourcePool;
   private List<InterpreterContextRunner> runners = new ArrayList<>();
-  private String className;
+  private String interpreterClassName;
   private RemoteEventClientWrapper client;
   private RemoteWorksController remoteWorksController;
   private Map<String, Integer> progressMap;
@@ -214,12 +214,12 @@ public class InterpreterContext {
     return runners;
   }
 
-  public String getClassName() {
-    return className;
+  public String getInterpreterClassName() {
+    return interpreterClassName;
   }
   
-  public void setClassName(String className) {
-    this.className = className;
+  public void setInterpreterClassName(String className) {
+    this.interpreterClassName = className;
   }
 
   public RemoteEventClientWrapper getClient() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 766ed16..88ac59e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -199,7 +199,6 @@ public class RemoteInterpreterServer extends Thread
         }
       }).start();
     }
-    logger.info("Starting remote interpreter server on port {}", port);
     server.serve();
   }
 
@@ -420,21 +419,21 @@ public class RemoteInterpreterServer extends Thread
 
 
   @Override
-  public RemoteInterpreterResult interpret(String noteId, String className, String st,
+  public RemoteInterpreterResult interpret(String sessionId, String className, String st,
                                            RemoteInterpreterContext interpreterContext)
       throws TException {
     if (logger.isDebugEnabled()) {
       logger.debug("st:\n{}", st);
     }
-    Interpreter intp = getInterpreter(noteId, className);
+    Interpreter intp = getInterpreter(sessionId, className);
     InterpreterContext context = convert(interpreterContext);
-    context.setClassName(intp.getClassName());
+    context.setInterpreterClassName(intp.getClassName());
 
     Scheduler scheduler = intp.getScheduler();
     InterpretJobListener jobListener = new InterpretJobListener();
     InterpretJob job = new InterpretJob(
         interpreterContext.getParagraphId(),
-        "remoteInterpretJob_" + System.currentTimeMillis(),
+        "RemoteInterpretJob_" + System.currentTimeMillis(),
         jobListener,
         JobProgressPoller.DEFAULT_INTERVAL_MSEC,
         intp,
@@ -638,8 +637,7 @@ public class RemoteInterpreterServer extends Thread
         // put result into resource pool
         if (resultMessages.size() > 0) {
           int lastMessageIndex = resultMessages.size() - 1;
-          if (resultMessages.get(lastMessageIndex).getType() ==
-              InterpreterResult.Type.TABLE) {
+          if (resultMessages.get(lastMessageIndex).getType() == InterpreterResult.Type.TABLE) {
             context.getResourcePool().put(
                 context.getNoteId(),
                 context.getParagraphId(),
@@ -667,10 +665,11 @@ public class RemoteInterpreterServer extends Thread
 
 
   @Override
-  public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext)
-      throws TException {
+  public void cancel(String sessionId,
+                     String className,
+                     RemoteInterpreterContext interpreterContext) throws TException {
     logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
-    Interpreter intp = getInterpreter(noteId, className);
+    Interpreter intp = getInterpreter(sessionId, className);
     String jobId = interpreterContext.getParagraphId();
     Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
 
@@ -742,8 +741,10 @@ public class RemoteInterpreterServer extends Thread
         new TypeToken<List<RemoteInterpreterContextRunner>>() {
         }.getType());
 
-    for (InterpreterContextRunner r : runners) {
-      contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+    if (runners != null) {
+      for (InterpreterContextRunner r : runners) {
+        contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+      }
     }
 
     return new InterpreterContext(
@@ -790,7 +791,7 @@ public class RemoteInterpreterServer extends Thread
         String output;
         try {
           output = new String(out.toByteArray());
-          logger.debug("Output Update: {}", output);
+          logger.debug("Output Update for index {}: {}", index, output);
           eventClient.onInterpreterOutputUpdate(
               noteId, paragraphId, index, out.getType(), output);
         } catch (IOException e) {
@@ -919,7 +920,7 @@ public class RemoteInterpreterServer extends Thread
       if (interpreters == null) {
         return Status.UNKNOWN.name();
       }
-
+      //TODO(zjffdu) ineffient for loop interpreter and its jobs
       for (Interpreter intp : interpreters) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
           if (jobId.equals(job.getId())) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5499fc4e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 79a2331..2ae1362 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -18,128 +18,266 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class RemoteInterpreterServerTest {
-  @Before
-  public void setUp() throws Exception {
-  }
 
-  @After
-  public void tearDown() throws Exception {
+  @Test
+  public void testStartStop() throws InterruptedException, IOException, TException {
+    RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+
+    startRemoteInterpreterServer(server, 10 * 1000);
+    stopRemoteInterpreterServer(server, 10 * 10000);
   }
 
   @Test
-  public void testStartStop() throws InterruptedException, IOException, TException {
+  public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
     RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
         RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
-    assertEquals(false, server.isRunning());
 
+    startRemoteInterpreterServer(server, 10 * 1000);
+    //just send an event on the client queue
+    server.eventClient.onAppStatusUpdate("", "", "", "");
+    stopRemoteInterpreterServer(server, 10 * 10000);
+  }
+
+  private void startRemoteInterpreterServer(RemoteInterpreterServer server, int timeout)
+      throws InterruptedException {
+    assertEquals(false, server.isRunning());
     server.start();
     long startTime = System.currentTimeMillis();
-    boolean running = false;
-
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+    while (System.currentTimeMillis() - startTime < timeout) {
       if (server.isRunning()) {
-        running = true;
         break;
-      } else {
-        Thread.sleep(200);
       }
+      Thread.sleep(200);
     }
-
-    assertEquals(true, running);
+    assertEquals(true, server.isRunning());
     assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
         server.getPort()));
+  }
 
+  private void stopRemoteInterpreterServer(RemoteInterpreterServer server, int timeout)
+      throws TException, InterruptedException {
+    assertEquals(true, server.isRunning());
     server.shutdown();
-
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        Thread.sleep(200);
-      } else {
-        running = false;
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < timeout) {
+      if (!server.isRunning()) {
         break;
       }
+      Thread.sleep(200);
     }
-    assertEquals(false, running);
+    assertEquals(false, server.isRunning());
+    assertEquals(false, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
+        server.getPort()));
   }
 
-  class ShutdownRun implements Runnable {
-    private RemoteInterpreterServer serv = null;
+  @Test
+  public void testInterpreter() throws IOException, TException, InterruptedException {
+    final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+
+    Map<String, String> intpProperties = new HashMap<>();
+    intpProperties.put("property_1", "value_1");
+    intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");
+
+    // create Test1Interpreter in session_1
+    server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+        intpProperties, "user_1");
+    Test1Interpreter interpreter1 = (Test1Interpreter)
+        ((LazyOpenInterpreter) server.interpreterGroup.get("session_1").get(0))
+            .getInnerInterpreter();
+    assertEquals(1, server.interpreterGroup.getSessionNum());
+    assertEquals(1, server.interpreterGroup.get("session_1").size());
+    assertEquals(2, interpreter1.getProperties().size());
+    assertEquals("value_1", interpreter1.getProperty("property_1"));
+
+    // create Test2Interpreter in session_1
+    server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+        intpProperties, "user_1");
+    assertEquals(2, server.interpreterGroup.get("session_1").size());
+
+    // create Test1Interpreter in session_2
+    server.createInterpreter("group_1", "session_2", Test1Interpreter.class.getName(),
+        intpProperties, "user_1");
+    assertEquals(2, server.interpreterGroup.getSessionNum());
+    assertEquals(2, server.interpreterGroup.get("session_1").size());
+    assertEquals(1, server.interpreterGroup.get("session_2").size());
+
+    final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
+    intpContext.setNoteId("note_1");
+    intpContext.setParagraphId("paragraph_1");
+    intpContext.setGui("{}");
+    intpContext.setNoteGui("{}");
+
+    // single output of SUCCESS
+    RemoteInterpreterResult result = server.interpret("session_1", Test1Interpreter.class.getName(),
+        "SINGLE_OUTPUT_SUCCESS", intpContext);
+    assertEquals("SUCCESS", result.code);
+    assertEquals(1, result.getMsg().size());
+    assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(0).getData());
+
+    // combo output of SUCCESS
+    result = server.interpret("session_1", Test1Interpreter.class.getName(), "COMBO_OUTPUT_SUCCESS",
+        intpContext);
+    assertEquals("SUCCESS", result.code);
+    assertEquals(2, result.getMsg().size());
+    assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
+    assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());
+
+    // single output of ERROR
+    result = server.interpret("session_1", Test1Interpreter.class.getName(), "SINGLE_OUTPUT_ERROR",
+        intpContext);
+    assertEquals("ERROR", result.code);
+    assertEquals(1, result.getMsg().size());
+    assertEquals("SINGLE_OUTPUT_ERROR", result.getMsg().get(0).getData());
+
+    // getFormType
+    String formType = server.getFormType("session_1", Test1Interpreter.class.getName());
+    assertEquals("NATIVE", formType);
+
+    // cancel
+    Thread sleepThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          server.interpret("session_1", Test1Interpreter.class.getName(), "SLEEP", intpContext);
+        } catch (TException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+    sleepThread.start();
+
+    Thread.sleep(1000);
+    assertFalse(interpreter1.cancelled.get());
+    server.cancel("session_1", Test1Interpreter.class.getName(), intpContext);
+    assertTrue(interpreter1.cancelled.get());
+
+    // getProgress
+    assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(),
+        intpContext));
+
+    // close
+    server.close("session_1", Test1Interpreter.class.getName());
+    assertTrue(interpreter1.closed.get());
+  }
+
+  public static class Test1Interpreter extends Interpreter {
+
+    AtomicBoolean cancelled = new AtomicBoolean();
+    AtomicBoolean closed = new AtomicBoolean();
+
+    public Test1Interpreter(Properties properties) {
+      super(properties);
+    }
+
+    @Override
+    public void open() {
 
-    ShutdownRun(RemoteInterpreterServer serv) {
-      this.serv = serv;
     }
 
     @Override
-    public void run() {
-      try {
-        serv.shutdown();
-      } catch (Exception ex) {
-        // ignore exception
+    public InterpreterResult interpret(String st, InterpreterContext context) {
+      if (st.equals("SINGLE_OUTPUT_SUCCESS")) {
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS");
+      } else if (st.equals("SINGLE_OUTPUT_ERROR")) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "SINGLE_OUTPUT_ERROR");
+      } else if (st.equals("COMBO_OUTPUT_SUCCESS")) {
+        try {
+          context.out.write("INTERPRETER_OUT");
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS");
+      } else if (st.equals("SLEEP")) {
+        try {
+          Thread.sleep(3 * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SLEEP_SUCCESS");
       }
+      return null;
     }
-  }
 
-  ;
+    @Override
+    public void cancel(InterpreterContext context) throws InterpreterException {
+      cancelled.set(true);
+    }
 
-  @Test
-  public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
-    assertEquals(false, server.isRunning());
+    @Override
+    public FormType getFormType() throws InterpreterException {
+      return FormType.NATIVE;
+    }
 
-    server.start();
-    long startTime = System.currentTimeMillis();
-    boolean running = false;
+    @Override
+    public int getProgress(InterpreterContext context) throws InterpreterException {
+      return 10;
+    }
 
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        running = true;
-        break;
-      } else {
-        Thread.sleep(200);
-      }
+    @Override
+    public void close() {
+      closed.set(true);
     }
 
-    assertEquals(true, running);
-    assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost",
-        server.getPort()));
+  }
 
-    //just send an event on the client queue
-    server.eventClient.onAppStatusUpdate("", "", "", "");
+  public static class Test2Interpreter extends Interpreter {
 
-    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
-    Runnable task = new ShutdownRun(server);
+    public Test2Interpreter(Properties properties) {
+      super(properties);
+    }
 
-    executor.schedule(task, 0, TimeUnit.MILLISECONDS);
+    @Override
+    public void open() {
 
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        Thread.sleep(200);
-      } else {
-        running = false;
-        break;
-      }
     }
 
-    executor.shutdown();
+    @Override
+    public InterpreterResult interpret(String st, InterpreterContext context) {
+      return null;
+    }
 
-    //cleanup environment for next tests
-    server.shutdown();
+    @Override
+    public void cancel(InterpreterContext context) throws InterpreterException {
 
-    assertEquals(false, running);
-  }
+    }
 
+    @Override
+    public FormType getFormType() throws InterpreterException {
+      return FormType.NATIVE;
+    }
+
+    @Override
+    public int getProgress(InterpreterContext context) throws InterpreterException {
+      return 0;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+  }
 }