You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by li...@apache.org on 2019/07/19 09:51:16 UTC

[zeppelin] branch master updated: [ZEPPELIN-4236] Cluster mode support interpreter running in docker

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f97164  [ZEPPELIN-4236] Cluster mode support interpreter running in docker
8f97164 is described below

commit 8f971641769b3cafcb9b8d08b158a7f8ba18451a
Author: Xun Liu <li...@apache.org>
AuthorDate: Mon Jul 15 14:07:23 2019 +0800

    [ZEPPELIN-4236] Cluster mode support interpreter running in docker
    
    ### What is this PR for?
    Now that zeppelin is set up in cluster mode, the interpreter will not work in docker mode.
    We need to mix these two modes together.
    This will allow more users to use the interpreter container.
    
    ### What type of PR is it?
    [Improvement]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4236
    
    ### How should this be tested?
    * [CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/557797483)
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #3405 from liuxunorg/ZEPPELIN-4236 and squashes the following commits:
    
    0a9b34ba6 [Xun Liu] Add `Docker Mode` and `Local Mode` test case.
    b26195d9d [Xun Liu] [ZEPPELIN-4236] Cluster mode support interpreter running in docker
---
 .../zeppelin/conf/ZeppelinConfiguration.java       |  5 +++
 .../interpreter/launcher/InterpreterLauncher.java  |  2 +-
 zeppelin-plugins/launcher/cluster/pom.xml          |  5 +++
 .../launcher/ClusterInterpreterCheckThread.java    | 22 ++++++++----
 .../launcher/ClusterInterpreterLauncher.java       | 42 +++++++++++++++++-----
 .../launcher/ClusterInterpreterProcess.java        |  3 --
 .../launcher/ClusterInterpreterLauncherTest.java   | 38 ++++++++++++++++++++
 7 files changed, 99 insertions(+), 18 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 2818313..28fefcd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -722,6 +722,11 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     }
   }
 
+  @VisibleForTesting
+  public void setRunMode(RUN_MODE runMode) {
+    properties.put(ConfVars.ZEPPELIN_RUN_MODE.getVarName(), runMode.name());
+  }
+
   public boolean getK8sPortForward() {
     return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD);
   }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index e505595..fe105c6 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -44,7 +44,7 @@ public abstract class InterpreterLauncher {
   protected int getConnectTimeout() {
     int connectTimeout =
         zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
-    if (properties.containsKey(
+    if (properties != null && properties.containsKey(
         ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())) {
       connectTimeout = Integer.parseInt(properties.getProperty(
           ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName()));
diff --git a/zeppelin-plugins/launcher/cluster/pom.xml b/zeppelin-plugins/launcher/cluster/pom.xml
index 80ca0b3..bf24546 100644
--- a/zeppelin-plugins/launcher/cluster/pom.xml
+++ b/zeppelin-plugins/launcher/cluster/pom.xml
@@ -46,6 +46,11 @@
       <artifactId>launcher-standard</artifactId>
       <version>0.9.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>launcher-docker</artifactId>
+      <version>0.9.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
index fb7e41f..a389135 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -33,10 +33,16 @@ public class ClusterInterpreterCheckThread extends Thread {
   private static final Logger LOGGER
       = LoggerFactory.getLogger(ClusterInterpreterCheckThread.class);
 
-  private ClusterInterpreterProcess intpProcess;
+  private InterpreterClient intpProcess;
+  private String intpGroupId;
+  private int connectTimeout;
 
-  ClusterInterpreterCheckThread(ClusterInterpreterProcess intpProcess) {
+  ClusterInterpreterCheckThread(InterpreterClient intpProcess,
+                                String intpGroupId,
+                                int connectTimeout) {
     this.intpProcess = intpProcess;
+    this.intpGroupId = intpGroupId;
+    this.connectTimeout = connectTimeout;
   }
 
   @Override
@@ -45,11 +51,8 @@ public class ClusterInterpreterCheckThread extends Thread {
 
     ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
 
-    String intpGroupId = intpProcess.getInterpreterGroupId();
-
     HashMap<String, Object> intpMeta = clusterServer
         .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-    int connectTimeout = intpProcess.getConnectTimeout();
 
     int MAX_RETRY_GET_META = connectTimeout / ClusterInterpreterLauncher.CHECK_META_INTERVAL;
     int retryGetMeta = 0;
@@ -71,7 +74,14 @@ public class ClusterInterpreterCheckThread extends Thread {
         int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
         LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
 
-        intpProcess.processStarted(intpPort, intpHost);
+        if (intpProcess instanceof DockerInterpreterProcess) {
+          ((DockerInterpreterProcess) intpProcess).processStarted(intpPort, intpHost);
+        } else if (intpProcess instanceof ClusterInterpreterProcess) {
+          ((ClusterInterpreterProcess) intpProcess).processStarted(intpPort, intpHost);
+        } else {
+          LOGGER.error("Unknown type !");
+        }
+
         break;
       }
     }
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 2b2ac61..14bbce2 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterRunner;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
     implements ClusterEventListener {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class);
 
-  public static final int CHECK_META_INTERVAL = 500; // ms
+  public static final int CHECK_META_INTERVAL = 2000; // ms
   private InterpreterLaunchContext context;
   private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
 
@@ -175,8 +176,8 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
           String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
           InterpreterLaunchContext context = gson.fromJson(
               eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType());
-          ClusterInterpreterProcess clusterInterpreterProcess = createInterpreterProcess(context);
-          clusterInterpreterProcess.start(context.getUserName());
+          InterpreterClient clusterOrDockerIntpProcess = createInterpreterProcess(context);
+          clusterOrDockerIntpProcess.start(context.getUserName());
           break;
         default:
           LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
@@ -187,10 +188,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
     }
   }
 
-  private ClusterInterpreterProcess createInterpreterProcess(InterpreterLaunchContext context) {
-    ClusterInterpreterProcess clusterInterpreterProcess = null;
+  private RemoteInterpreterProcess createClusterIntpProcess() {
+    ClusterInterpreterProcess clusterIntpProcess = null;
     try {
-      this.properties = context.getProperties();
       InterpreterOption option = context.getOption();
       InterpreterRunner runner = context.getRunner();
       String intpSetGroupName = context.getInterpreterSettingGroup();
@@ -199,7 +199,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
       String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
           + context.getInterpreterSettingId();
 
-      clusterInterpreterProcess = new ClusterInterpreterProcess(
+      clusterIntpProcess = new ClusterInterpreterProcess(
           runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
           context.getZeppelinServerRPCPort(),
           context.getZeppelinServerHost(),
@@ -215,6 +215,32 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
       LOGGER.error(e.getMessage(), e);
     }
 
-    return clusterInterpreterProcess;
+    return clusterIntpProcess;
+  }
+
+  private InterpreterClient createInterpreterProcess(InterpreterLaunchContext context)
+      throws IOException {
+    this.context = context;
+    this.properties = context.getProperties();
+    int connectTimeout = getConnectTimeout();
+
+    InterpreterClient remoteIntpProcess = null;
+    if (isRunningOnDocker(zConf)) {
+      DockerInterpreterLauncher dockerIntpLauncher = new DockerInterpreterLauncher(zConf, null);
+      dockerIntpLauncher.setProperties(context.getProperties());
+      remoteIntpProcess = dockerIntpLauncher.launch(context);
+    } else {
+      remoteIntpProcess = createClusterIntpProcess();
+    }
+
+    ClusterInterpreterCheckThread intpCheckThread = new ClusterInterpreterCheckThread(
+        remoteIntpProcess, context.getInterpreterGroupId(), connectTimeout);
+    intpCheckThread.start();
+
+    return remoteIntpProcess;
+  }
+
+  private boolean isRunningOnDocker(ZeppelinConfiguration zconf) {
+    return zconf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER;
   }
 }
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index 8f0fcc7..744e880 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -36,9 +36,6 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
 
   @Override
   public void start(String userName) throws IOException {
-    ClusterInterpreterCheckThread interpreterCheckThread = new ClusterInterpreterCheckThread(this);
-    interpreterCheckThread.start();
-
     super.start(userName);
   }
 
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index 2b2b172..7d83f07 100644
--- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -99,4 +99,42 @@ public class ClusterInterpreterLauncherTest extends ClusterMockTest {
     assertTrue(interpreterProcess.getEnv().size() >= 1);
     assertEquals(true, interpreterProcess.isUserImpersonated());
   }
+
+  @Test
+  public void testCreateIntpProcessDockerMode() throws IOException {
+    zconf.setRunMode(ZeppelinConfiguration.RUN_MODE.DOCKER);
+
+    ClusterInterpreterLauncher launcher
+        = new ClusterInterpreterLauncher(zconf, null);
+    Properties properties = new Properties();
+    properties.setProperty(
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+        "user1", "intpGroupId3", "groupId3",
+        "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+
+    assertTrue(client instanceof DockerInterpreterProcess);
+  }
+
+  @Test
+  public void testCreateIntpProcessLocalMode() throws IOException {
+    zconf.setRunMode(ZeppelinConfiguration.RUN_MODE.LOCAL);
+
+    ClusterInterpreterLauncher launcher
+        = new ClusterInterpreterLauncher(zconf, null);
+    Properties properties = new Properties();
+    properties.setProperty(
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000");
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+        "user1", "intpGroupId4", "groupId4",
+        "groupName", "name", 0, "host");
+    InterpreterClient client = launcher.launch(context);
+
+    assertTrue(client instanceof ClusterInterpreterProcess);
+  }
 }