You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/04/15 05:34:05 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s

This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 6225868  [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s
6225868 is described below

commit 62258682c103732a516c36e6ac4a2a22bf800278
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Apr 8 09:39:30 2020 +0200

    [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s
    
    ### What is this PR for?
    This PullRequest fixes the impersonation issue when running Spark Zeppelin interpreter on K8s. A general user impersonation should not be necessary and possible on K8s, since the interpreter process runs unprivileged - without the right to change the user via ssh.
    
    ### What type of PR is it?
    * Improvement
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4722
    
    ### How should this be tested?
    * **Travic-CI**: https://travis-ci.org/github/Reamer/zeppelin/builds/674723139
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #3733 from Reamer/proxy_user and squashes the following commits:
    
    8c2e58bb5 [Philipp Dallig] proxy user in spark on k8s
    3989c4e87 [Philipp Dallig] Some Cleanup
    
    (cherry picked from commit 85fc206b4f1504dc29da8264b3619ed31006a3b2)
    Signed-off-by: Lee moon soo <mo...@apache.org>
---
 .../zeppelin/conf/ZeppelinConfiguration.java       |   5 +
 .../launcher/K8sRemoteInterpreterProcess.java      |  46 ++++----
 .../launcher/K8sStandardInterpreterLauncher.java   |  23 ++--
 .../zeppelin/interpreter/launcher/Kubectl.java     |  11 +-
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 117 +++++++++++++++++++--
 .../launcher/SparkInterpreterLauncher.java         |   5 +-
 6 files changed, 164 insertions(+), 43 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 1be243f..3deb8af 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -697,6 +697,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS);
   }
 
+  public boolean getZeppelinImpersonateSparkProxyUser() {
+      return getBoolean(ConfVars.ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER);
+  }
+
   public String getZeppelinNotebookGitURL() {
     return  getString(ConfVars.ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL);
   }
@@ -998,6 +1002,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
 
     ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()),
 
+    ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER("zeppelin.impersonate.spark.proxy.user", true),
     ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
     ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
     ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 959718d..5f872cc 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -13,13 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.hubspot.jinjava.Jinjava;
 import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
-  private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
   private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
   private final Kubectl kubectl;
   private final String interpreterGroupId;
@@ -39,6 +41,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private ExecuteWatchdog portForwardWatchdog;
   private int podPort = K8S_INTERPRETER_SERVICE_PORT;
 
+  private final boolean isUserImpersonatedForSpark;
+  private String userName;
+
   private AtomicBoolean started = new AtomicBoolean(false);
 
   public K8sRemoteInterpreterProcess(
@@ -54,7 +59,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           String zeppelinServiceRpcPort,
           boolean portForward,
           String sparkImage,
-          int connectTimeout
+          int connectTimeout,
+          boolean isUserImpersonatedForSpark
   ) {
     super(connectTimeout);
     this.kubectl = kubectl;
@@ -64,12 +70,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     this.interpreterGroupName = interpreterGroupName;
     this.interpreterSettingName = interpreterSettingName;
     this.properties = properties;
-    this.envs = new HashMap(envs);
+    this.envs = new HashMap<>(envs);
     this.zeppelinServiceHost = zeppelinServiceHost;
     this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
     this.portForward = portForward;
     this.sparkImage = sparkImage;
     this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
+    this.isUserImpersonatedForSpark = isUserImpersonatedForSpark;
   }
 
 
@@ -89,6 +96,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
   @Override
   public void start(String userName) throws IOException {
+    /**
+     * If a spark interpreter process is running, userName is set in preparation for --proxy-user
+     */
+    if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
+      this.userName = userName;
+    } else {
+      this.userName = null;
+    }
     // create new pod
     apply(specTempaltes, false);
     kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000);
@@ -116,9 +131,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     }
 
     if (!started.get()) {
-      LOGGER.info(
-          String.format("Interpreter pod creation is time out in %d seconds",
-              getConnectTimeout()/1000));
+      LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000);
     }
 
     // waits for interpreter thrift rpc server ready
@@ -210,7 +223,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
    */
   void apply(File path, boolean delete) throws IOException {
     if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
-      LOGGER.info("Skip " + path.getAbsolutePath());
+      LOGGER.info("Skip {}", path.getAbsolutePath());
     }
 
     if (path.isDirectory()) {
@@ -224,7 +237,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
         apply(f, delete);
       }
     } else if (path.isFile()) {
-      LOGGER.info("Apply " + path.getAbsolutePath());
+      LOGGER.info("Apply {}", path.getAbsolutePath());
       K8sSpecTemplate specTemplate = new K8sSpecTemplate();
       specTemplate.loadProperties(getTemplateBindings());
 
@@ -235,12 +248,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
         kubectl.apply(spec);
       }
     } else {
-      LOGGER.error("Can't apply " + path.getAbsolutePath());
+      LOGGER.error("Can't apply {}", path.getAbsolutePath());
     }
   }
 
   @VisibleForTesting
-  Properties getTemplateBindings() throws IOException {
+  Properties getTemplateBindings() {
     Properties k8sProperties = new Properties();
 
     // k8s template properties
@@ -318,11 +331,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
   boolean isSparkOnKubernetes(Properties interpreteProperties) {
     String propertySparkMaster = (String) interpreteProperties.getOrDefault("master", "");
-    if (propertySparkMaster.startsWith("k8s://")) {
-      return true;
-    } else {
-      return false;
-    }
+    return propertySparkMaster.startsWith("k8s://");
   }
 
   @VisibleForTesting
@@ -334,6 +343,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     if (properties.containsKey("spark.driver.memory")) {
       options.append(" --driver-memory " + properties.get("spark.driver.memory"));
     }
+    if (userName != null) {
+      options.append(" --proxy-user " + userName);
+    }
     options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
     options.append(" --conf spark.executor.instances=1");
     options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
@@ -392,9 +404,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       char c = chars[random.nextInt(chars.length)];
       sb.append(c);
     }
-    String randomStr = sb.toString();
-
-    return randomStr;
+    return sb.toString();
   }
 
   @Override
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 3f2e39d..085c3f9 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -70,11 +70,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * @return
    */
   boolean isRunningOnKubernetes() {
-    if (new File("/var/run/secrets/kubernetes.io").exists()) {
-      return true;
-    } else {
-      return false;
-    }
+    return new File("/var/run/secrets/kubernetes.io").exists();
   }
 
   /**
@@ -130,9 +126,21 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
     }
   }
 
+  /**
+   * Interpreter Process will run in K8s. There is no point in changing the user after starting the container.
+   * Switching to an other user (non-privileged) should be done during the image creation process.
+   *
+   * Only if a spark interpreter process is running, userImpersonatation should be possible for --proxy-user
+   */
+  private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) {
+      return zConf.getZeppelinImpersonateSparkProxyUser() &&
+          context.getOption().isUserImpersonate() &&
+          "spark".equalsIgnoreCase(context.getInterpreterGroupId());
+  }
+
   @Override
   public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
-    LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+    LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
     this.context = context;
     this.properties = context.getProperties();
     int connectTimeout = getConnectTimeout();
@@ -150,7 +158,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
             getZeppelinServiceRpcPort(),
             zConf.getK8sPortForward(),
             zConf.getK8sSparkContainerImage(),
-            connectTimeout);
+            connectTimeout,
+            isUserImpersonateForSparkInterpreter(context));
   }
 
   protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
index 2079d16..39e7e92 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
 import java.util.ArrayList;
 import java.util.Arrays;
 
@@ -31,9 +30,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Kubectl {
-  private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
   private final String kubectlCmd;
-  private final Gson gson = new Gson();
   private String namespace;
 
   public Kubectl(String kubectlCmd) {
@@ -118,7 +116,7 @@ public class Kubectl {
       argsToOverride.add("--namespace=" + namespace);
     }
 
-    LOGGER.info("kubectl " + argsToOverride);
+    LOGGER.info("kubectl {}", argsToOverride);
     LOGGER.debug(stdin);
 
     try {
@@ -130,8 +128,7 @@ public class Kubectl {
       );
 
       if (exitCode == 0) {
-        String output = new String(stdout.toByteArray());
-        return output;
+        return new String(stdout.toByteArray());
       } else {
         String output = new String(stderr.toByteArray());
         throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
@@ -147,7 +144,7 @@ public class Kubectl {
     CommandLine cmd = new CommandLine(kubectlCmd);
     cmd.addArguments(args);
 
-    ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000);
+    ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L);
     executor.setWatchdog(watchdog);
 
     PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 9d6b634..1459810 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -53,11 +54,8 @@ public class K8sRemoteInterpreterProcessTest {
         "12320",
         false,
         "spark-container:1.0",
-        10);
-
-    // when
-    String host = intp.getHost();
-    int port = intp.getPort();
+        10,
+        false);
 
     // then
     assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
@@ -86,7 +84,8 @@ public class K8sRemoteInterpreterProcessTest {
         "12320",
         false,
         "spark-container:1.0",
-        10);
+        10,
+        false);
 
 
     // following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
@@ -120,7 +119,8 @@ public class K8sRemoteInterpreterProcessTest {
         "12320",
         false,
         "spark-container:1.0",
-        10);
+        10,
+        false);
 
     // when
     Properties p = intp.getTemplateBindings();
@@ -172,9 +172,11 @@ public class K8sRemoteInterpreterProcessTest {
         "12320",
         false,
         "spark-container:1.0",
-        10);
+        10,
+        false);
 
     // when
+    intp.start("mytestUser");
     Properties p = intp.getTemplateBindings();
 
     // then
@@ -192,6 +194,105 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
     assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
     assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+    assertFalse(sparkSubmitOptions.contains("--proxy-user"));
+    assertTrue(intp.isSpark());
+  }
+
+  @Test
+  public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
+    // given
+    Kubectl kubectl = mock(Kubectl.class);
+    when(kubectl.getNamespace()).thenReturn("default");
+
+    Properties properties = new Properties();
+    properties.put("my.key1", "v1");
+    properties.put("master", "k8s://http://api");
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("MY_ENV1", "V1");
+    envs.put("SPARK_SUBMIT_OPTIONS", "my options");
+    envs.put("SERVICE_DOMAIN", "mydomain");
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        kubectl,
+        new File(".skip"),
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.hostname",
+        "12320",
+        false,
+        "spark-container:1.0",
+        10,
+        true);
+
+    // when
+    intp.start("mytestUser");
+    Properties p = intp.getTemplateBindings();
+    // then
+    assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
+    assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
+
+    envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
+    assertTrue( envs.containsKey("SPARK_HOME"));
+
+    String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
+    assertTrue(sparkSubmitOptions.startsWith("my options "));
+    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
+    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
+    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+    assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
+    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+    assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
+    assertTrue(intp.isSpark());
+  }
+
+  @Test
+  public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
+    // given
+    Kubectl kubectl = mock(Kubectl.class);
+    when(kubectl.getNamespace()).thenReturn("default");
+
+    Properties properties = new Properties();
+    properties.put("my.key1", "v1");
+    properties.put("master", "k8s://http://api");
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("MY_ENV1", "V1");
+    envs.put("SPARK_SUBMIT_OPTIONS", "my options");
+    envs.put("SERVICE_DOMAIN", "mydomain");
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        kubectl,
+        new File(".skip"),
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.hostname",
+        "12320",
+        false,
+        "spark-container:1.0",
+        10,
+        true);
+
+    // when
+    intp.start("anonymous");
+    Properties p = intp.getTemplateBindings();
+    // then
+    assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
+    assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
+
+    envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
+    assertTrue( envs.containsKey("SPARK_HOME"));
+
+    String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
+    assertFalse(sparkSubmitOptions.contains("--proxy-user"));
+    assertTrue(intp.isSpark());
   }
 
   @Test
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 02e8f1b..7dc888f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -156,9 +156,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
     for (String name : sparkProperties.stringPropertyNames()) {
       sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
     }
-    String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
-    if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
-            !useProxyUserEnv.equals("false"))) {
+
+    if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
       sparkConfBuilder.append(" --proxy-user " + context.getUserName());
     }