You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by al...@apache.org on 2020/04/09 06:44:10 UTC

[zeppelin] branch master updated: [ZEPPELIN-4725]. Merge process env into interpreter process env in InterpreterLauncher

This is an automated email from the ASF dual-hosted git repository.

alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0430161  [ZEPPELIN-4725]. Merge process env into interpreter process env in InterpreterLauncher
0430161 is described below

commit 0430161e94e45cd07f5f729a35ed07a9587bd71e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Apr 9 00:09:38 2020 +0800

    [ZEPPELIN-4725]. Merge process env into interpreter process env in InterpreterLauncher
    
    ### What is this PR for?
    
    This PR is to just merge process env into interpreter process env in InterpreterLauncher so that env defined in zeppelin-env.sh also apply to interpreters. e.g. `FLINK_HOME` specified in `zeppelin-env.sh` will apply on flink interpreter. But flink interpreter can override it.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4725
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3719 from zjffdu/ZEPPELIN-4725 and squashes the following commits:
    
    33ecff471 [Jeff Zhang] address comment
    41f1f87c3 [Jeff Zhang] [ZEPPELIN-4725]. Merge process env into interpreter process env in InterpreterLauncher
---
 .../interpreter/launcher/FlinkInterpreterLauncher.java        | 11 ++++++-----
 .../org/apache/zeppelin/interpreter/InterpreterSetting.java   |  3 ++-
 .../interpreter/launcher/StandardInterpreterLauncher.java     |  3 ++-
 .../interpreter/remote/RemoteInterpreterManagedProcess.java   |  6 +-----
 4 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index fe3adf7..248c7e7 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 
@@ -34,16 +35,16 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
   public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context)
           throws IOException {
     Map<String, String> envs = super.buildEnvFromProperties(context);
-    String flinkHome = context.getProperties().getProperty("FLINK_HOME");
-    if (flinkHome == null) {
+    String flinkHome = context.getProperties().getProperty("FLINK_HOME", envs.get("FLINK_HOME"));
+    if (StringUtils.isBlank(flinkHome)) {
       throw new IOException("FLINK_HOME is not specified");
     }
     File flinkHomeFile = new File(flinkHome);
     if (!flinkHomeFile.exists()) {
-      throw new IOException(String.format("FLINK_HOME %s doesn't exist", flinkHome));
+      throw new IOException(String.format("FLINK_HOME '%s' doesn't exist", flinkHome));
     }
-    if (flinkHomeFile.isFile()) {
-      throw new IOException(String.format("FLINK_HOME %s is a file, but should be directory",
+    if (!flinkHomeFile.isDirectory()) {
+      throw new IOException(String.format("FLINK_HOME '%s' is a file, but should be directory",
               flinkHome));
     }
     envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 9a179cd..0babe2a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -637,7 +637,8 @@ public class InterpreterSetting {
     Properties jProperties = new Properties();
     Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
     for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
-      if (entry.getValue().getValue() != null) {
+      if (entry.getValue().getValue() != null &&
+              !StringUtils.isBlank(entry.getValue().getValue().toString())) {
         jProperties.setProperty(entry.getKey().trim(),
             entry.getValue().getValue().toString().trim());
       }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index 677cbd0..d78cb2c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -18,6 +18,7 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
@@ -90,7 +91,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
   }
 
   public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException {
-    Map<String, String> env = new HashMap<>();
+    Map<String, String> env = EnvironmentUtils.getProcEnvironment();
     for (Map.Entry entry : context.getProperties().entrySet()) {
       String key = (String) entry.getKey();
       String value = (String) entry.getValue();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index d6e2c52..69d82b6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -20,8 +20,6 @@ package org.apache.zeppelin.interpreter.remote;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.ExecuteException;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zeppelin.interpreter.YarnAppMonitor;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
@@ -117,9 +115,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
     cmdLine.addArgument("-g", false);
     cmdLine.addArgument(interpreterSettingName, false);
 
-    Map procEnv = EnvironmentUtils.getProcEnvironment();
-    procEnv.putAll(env);
-    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, procEnv);
+    interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env);
     interpreterProcessLauncher.launch();
     interpreterProcessLauncher.waitForReady(getConnectTimeout());
     if (interpreterProcessLauncher.isLaunchTimeout()) {