You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/04/15 05:34:05 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4722] User
Impersonation via --proxy-user for Spark Interpreter with K8s
This is an automated email from the ASF dual-hosted git repository.
moon pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 6225868 [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s
6225868 is described below
commit 62258682c103732a516c36e6ac4a2a22bf800278
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Apr 8 09:39:30 2020 +0200
[ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s
### What is this PR for?
This PullRequest fixes the impersonation issue when running Spark Zeppelin interpreter on K8s. A general user impersonation should not be necessary and possible on K8s, since the interpreter process runs unprivileged - without the right to change the user via ssh.
### What type of PR is it?
* Improvement
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4722
### How should this be tested?
* **Travic-CI**: https://travis-ci.org/github/Reamer/zeppelin/builds/674723139
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #3733 from Reamer/proxy_user and squashes the following commits:
8c2e58bb5 [Philipp Dallig] proxy user in spark on k8s
3989c4e87 [Philipp Dallig] Some Cleanup
(cherry picked from commit 85fc206b4f1504dc29da8264b3619ed31006a3b2)
Signed-off-by: Lee moon soo <mo...@apache.org>
---
.../zeppelin/conf/ZeppelinConfiguration.java | 5 +
.../launcher/K8sRemoteInterpreterProcess.java | 46 ++++----
.../launcher/K8sStandardInterpreterLauncher.java | 23 ++--
.../zeppelin/interpreter/launcher/Kubectl.java | 11 +-
.../launcher/K8sRemoteInterpreterProcessTest.java | 117 +++++++++++++++++++--
.../launcher/SparkInterpreterLauncher.java | 5 +-
6 files changed, 164 insertions(+), 43 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 1be243f..3deb8af 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -697,6 +697,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS);
}
+ public boolean getZeppelinImpersonateSparkProxyUser() {
+ return getBoolean(ConfVars.ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER);
+ }
+
public String getZeppelinNotebookGitURL() {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL);
}
@@ -998,6 +1002,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()),
+ ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER("zeppelin.impersonate.spark.proxy.user", true),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 959718d..5f872cc 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -13,13 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.hubspot.jinjava.Jinjava;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
- private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
private final Kubectl kubectl;
private final String interpreterGroupId;
@@ -39,6 +41,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private ExecuteWatchdog portForwardWatchdog;
private int podPort = K8S_INTERPRETER_SERVICE_PORT;
+ private final boolean isUserImpersonatedForSpark;
+ private String userName;
+
private AtomicBoolean started = new AtomicBoolean(false);
public K8sRemoteInterpreterProcess(
@@ -54,7 +59,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
String zeppelinServiceRpcPort,
boolean portForward,
String sparkImage,
- int connectTimeout
+ int connectTimeout,
+ boolean isUserImpersonatedForSpark
) {
super(connectTimeout);
this.kubectl = kubectl;
@@ -64,12 +70,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
this.interpreterGroupName = interpreterGroupName;
this.interpreterSettingName = interpreterSettingName;
this.properties = properties;
- this.envs = new HashMap(envs);
+ this.envs = new HashMap<>(envs);
this.zeppelinServiceHost = zeppelinServiceHost;
this.zeppelinServiceRpcPort = zeppelinServiceRpcPort;
this.portForward = portForward;
this.sparkImage = sparkImage;
this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
+ this.isUserImpersonatedForSpark = isUserImpersonatedForSpark;
}
@@ -89,6 +96,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
@Override
public void start(String userName) throws IOException {
+ /**
+ * If a spark interpreter process is running, userName is set in preparation for --proxy-user
+ */
+ if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
+ this.userName = userName;
+ } else {
+ this.userName = null;
+ }
// create new pod
apply(specTempaltes, false);
kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000);
@@ -116,9 +131,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
if (!started.get()) {
- LOGGER.info(
- String.format("Interpreter pod creation is time out in %d seconds",
- getConnectTimeout()/1000));
+ LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000);
}
// waits for interpreter thrift rpc server ready
@@ -210,7 +223,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
*/
void apply(File path, boolean delete) throws IOException {
if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
- LOGGER.info("Skip " + path.getAbsolutePath());
+ LOGGER.info("Skip {}", path.getAbsolutePath());
}
if (path.isDirectory()) {
@@ -224,7 +237,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
apply(f, delete);
}
} else if (path.isFile()) {
- LOGGER.info("Apply " + path.getAbsolutePath());
+ LOGGER.info("Apply {}", path.getAbsolutePath());
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
specTemplate.loadProperties(getTemplateBindings());
@@ -235,12 +248,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
kubectl.apply(spec);
}
} else {
- LOGGER.error("Can't apply " + path.getAbsolutePath());
+ LOGGER.error("Can't apply {}", path.getAbsolutePath());
}
}
@VisibleForTesting
- Properties getTemplateBindings() throws IOException {
+ Properties getTemplateBindings() {
Properties k8sProperties = new Properties();
// k8s template properties
@@ -318,11 +331,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
boolean isSparkOnKubernetes(Properties interpreteProperties) {
String propertySparkMaster = (String) interpreteProperties.getOrDefault("master", "");
- if (propertySparkMaster.startsWith("k8s://")) {
- return true;
- } else {
- return false;
- }
+ return propertySparkMaster.startsWith("k8s://");
}
@VisibleForTesting
@@ -334,6 +343,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (properties.containsKey("spark.driver.memory")) {
options.append(" --driver-memory " + properties.get("spark.driver.memory"));
}
+ if (userName != null) {
+ options.append(" --proxy-user " + userName);
+ }
options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
options.append(" --conf spark.executor.instances=1");
options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
@@ -392,9 +404,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
char c = chars[random.nextInt(chars.length)];
sb.append(c);
}
- String randomStr = sb.toString();
-
- return randomStr;
+ return sb.toString();
}
@Override
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 3f2e39d..085c3f9 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -70,11 +70,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
* @return
*/
boolean isRunningOnKubernetes() {
- if (new File("/var/run/secrets/kubernetes.io").exists()) {
- return true;
- } else {
- return false;
- }
+ return new File("/var/run/secrets/kubernetes.io").exists();
}
/**
@@ -130,9 +126,21 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
}
}
+ /**
+ * Interpreter Process will run in K8s. There is no point in changing the user after starting the container.
+ * Switching to an other user (non-privileged) should be done during the image creation process.
+ *
+ * Only if a spark interpreter process is running, userImpersonatation should be possible for --proxy-user
+ */
+ private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) {
+ return zConf.getZeppelinImpersonateSparkProxyUser() &&
+ context.getOption().isUserImpersonate() &&
+ "spark".equalsIgnoreCase(context.getInterpreterGroupId());
+ }
+
@Override
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
- LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+ LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
this.context = context;
this.properties = context.getProperties();
int connectTimeout = getConnectTimeout();
@@ -150,7 +158,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
getZeppelinServiceRpcPort(),
zConf.getK8sPortForward(),
zConf.getK8sSparkContainerImage(),
- connectTimeout);
+ connectTimeout,
+ isUserImpersonateForSparkInterpreter(context));
}
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
index 2079d16..39e7e92 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,9 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Kubectl {
- private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
private final String kubectlCmd;
- private final Gson gson = new Gson();
private String namespace;
public Kubectl(String kubectlCmd) {
@@ -118,7 +116,7 @@ public class Kubectl {
argsToOverride.add("--namespace=" + namespace);
}
- LOGGER.info("kubectl " + argsToOverride);
+ LOGGER.info("kubectl {}", argsToOverride);
LOGGER.debug(stdin);
try {
@@ -130,8 +128,7 @@ public class Kubectl {
);
if (exitCode == 0) {
- String output = new String(stdout.toByteArray());
- return output;
+ return new String(stdout.toByteArray());
} else {
String output = new String(stderr.toByteArray());
throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
@@ -147,7 +144,7 @@ public class Kubectl {
CommandLine cmd = new CommandLine(kubectlCmd);
cmd.addArguments(args);
- ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000);
+ ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L);
executor.setWatchdog(watchdog);
PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 9d6b634..1459810 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,11 +54,8 @@ public class K8sRemoteInterpreterProcessTest {
"12320",
false,
"spark-container:1.0",
- 10);
-
- // when
- String host = intp.getHost();
- int port = intp.getPort();
+ 10,
+ false);
// then
assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
@@ -86,7 +84,8 @@ public class K8sRemoteInterpreterProcessTest {
"12320",
false,
"spark-container:1.0",
- 10);
+ 10,
+ false);
// following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
@@ -120,7 +119,8 @@ public class K8sRemoteInterpreterProcessTest {
"12320",
false,
"spark-container:1.0",
- 10);
+ 10,
+ false);
// when
Properties p = intp.getTemplateBindings();
@@ -172,9 +172,11 @@ public class K8sRemoteInterpreterProcessTest {
"12320",
false,
"spark-container:1.0",
- 10);
+ 10,
+ false);
// when
+ intp.start("mytestUser");
Properties p = intp.getTemplateBindings();
// then
@@ -192,6 +194,105 @@ public class K8sRemoteInterpreterProcessTest {
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+ assertFalse(sparkSubmitOptions.contains("--proxy-user"));
+ assertTrue(intp.isSpark());
+ }
+
+ @Test
+ public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
+ // given
+ Kubectl kubectl = mock(Kubectl.class);
+ when(kubectl.getNamespace()).thenReturn("default");
+
+ Properties properties = new Properties();
+ properties.put("my.key1", "v1");
+ properties.put("master", "k8s://http://api");
+ HashMap<String, String> envs = new HashMap<String, String>();
+ envs.put("MY_ENV1", "V1");
+ envs.put("SPARK_SUBMIT_OPTIONS", "my options");
+ envs.put("SERVICE_DOMAIN", "mydomain");
+
+ K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+ kubectl,
+ new File(".skip"),
+ "interpreter-container:1.0",
+ "shared_process",
+ "spark",
+ "myspark",
+ properties,
+ envs,
+ "zeppelin.server.hostname",
+ "12320",
+ false,
+ "spark-container:1.0",
+ 10,
+ true);
+
+ // when
+ intp.start("mytestUser");
+ Properties p = intp.getTemplateBindings();
+ // then
+ assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
+ assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
+
+ envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
+ assertTrue( envs.containsKey("SPARK_HOME"));
+
+ String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
+ assertTrue(sparkSubmitOptions.startsWith("my options "));
+ assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+ assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
+ assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
+ assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+ assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
+ assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+ assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
+ assertTrue(intp.isSpark());
+ }
+
+ @Test
+ public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
+ // given
+ Kubectl kubectl = mock(Kubectl.class);
+ when(kubectl.getNamespace()).thenReturn("default");
+
+ Properties properties = new Properties();
+ properties.put("my.key1", "v1");
+ properties.put("master", "k8s://http://api");
+ HashMap<String, String> envs = new HashMap<String, String>();
+ envs.put("MY_ENV1", "V1");
+ envs.put("SPARK_SUBMIT_OPTIONS", "my options");
+ envs.put("SERVICE_DOMAIN", "mydomain");
+
+ K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+ kubectl,
+ new File(".skip"),
+ "interpreter-container:1.0",
+ "shared_process",
+ "spark",
+ "myspark",
+ properties,
+ envs,
+ "zeppelin.server.hostname",
+ "12320",
+ false,
+ "spark-container:1.0",
+ 10,
+ true);
+
+ // when
+ intp.start("anonymous");
+ Properties p = intp.getTemplateBindings();
+ // then
+ assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
+ assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
+
+ envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
+ assertTrue( envs.containsKey("SPARK_HOME"));
+
+ String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
+ assertFalse(sparkSubmitOptions.contains("--proxy-user"));
+ assertTrue(intp.isSpark());
}
@Test
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 02e8f1b..7dc888f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -156,9 +156,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
}
- String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
- if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
- !useProxyUserEnv.equals("false"))) {
+
+ if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) {
sparkConfBuilder.append(" --proxy-user " + context.getUserName());
}