You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2023/02/01 10:16:29 UTC
[zeppelin] branch master updated: [ZEPPELIN-5855] Refactor docker plugin and remove powermock (#4519)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new af8280685b [ZEPPELIN-5855] Refactor docker plugin and remove powermock (#4519)
af8280685b is described below
commit af8280685b4dc916521f4a279c64eb455ef52b4d
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Feb 1 11:16:17 2023 +0100
[ZEPPELIN-5855] Refactor docker plugin and remove powermock (#4519)
* Refactor docker plugin
* prevent infinite loop
---
docs/quickstart/docker.md | 2 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 7 +
zeppelin-plugins/launcher/docker/pom.xml | 8 --
.../launcher/DockerInterpreterProcess.java | 148 ++++++++++-----------
.../interpreter/launcher/utils/TarUtils.java | 47 ++++---
.../launcher/DockerInterpreterProcessTest.java | 109 +++++++--------
6 files changed, 157 insertions(+), 164 deletions(-)
diff --git a/docs/quickstart/docker.md b/docs/quickstart/docker.md
index 45e6bee669..187e9f9c83 100644
--- a/docs/quickstart/docker.md
+++ b/docs/quickstart/docker.md
@@ -88,7 +88,7 @@ access to this port.
Set to the same time zone as the zeppelin server, keeping the time zone in the interpreter docker container the same as the server. E.g, `"America/New_York"` or `"Asia/Shanghai"`
```bash
- export DOCKER_TIME_ZONE="America/New_York"
+ export ZEPPELIN_DOCKER_TIME_ZONE="America/New_York"
```
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index f251c3182a..efeeea3552 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -1086,8 +1087,14 @@ public class ZeppelinConfiguration {
ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"),
ZEPPELIN_K8S_TIMEOUT_DURING_PENDING("zeppelin.k8s.timeout.during.pending", true),
+ // Used by K8s and Docker plugin
ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()),
+ ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME("zeppelin.docker.container.spark.home", "/spark"),
+ ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER("zeppelin.docker.upload.local.lib.to.container", true),
+ ZEPPELIN_DOCKER_HOST("zeppelin.docker.host", "http://0.0.0.0:2375"),
+ ZEPPELIN_DOCKER_TIME_ZONE("zeppelin.docker.time.zone", TimeZone.getDefault().getID()),
+
ZEPPELIN_METRIC_ENABLE_PROMETHEUS("zeppelin.metric.enable.prometheus", false),
ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER("zeppelin.impersonate.spark.proxy.user", true),
diff --git a/zeppelin-plugins/launcher/docker/pom.xml b/zeppelin-plugins/launcher/docker/pom.xml
index 27e6b0bb6c..911d00a34b 100644
--- a/zeppelin-plugins/launcher/docker/pom.xml
+++ b/zeppelin-plugins/launcher/docker/pom.xml
@@ -61,14 +61,6 @@
<artifactId>commons-compress</artifactId>
<version>${commons.compress.version}</version>
</dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- </dependency>
</dependencies>
<build>
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 551e36bd7b..228eee5e7a 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -24,12 +24,11 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.SocketException;
import java.net.URI;
import java.net.URL;
-import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -37,7 +36,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import com.spotify.docker.client.DefaultDockerClient;
@@ -56,6 +54,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.launcher.utils.TarFileEntry;
import org.apache.zeppelin.interpreter.launcher.utils.TarUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -67,7 +66,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB;
public class DockerInterpreterProcess extends RemoteInterpreterProcess {
- private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterLauncher.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcess.class);
private String dockerIntpServicePort = "0";
@@ -99,13 +98,12 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
private String zeppelinHome;
@VisibleForTesting
- final String CONTAINER_SPARK_HOME;
+ final String containerSparkHome;
@VisibleForTesting
- final String DOCKER_HOST;
+ final String dockerHost;
- private String containerId;
- final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar";
+ private static final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar";
public DockerInterpreterProcess(
ZeppelinConfiguration zconf,
@@ -127,27 +125,20 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
this.interpreterGroupName = interpreterGroupName;
this.interpreterSettingName = interpreterSettingName;
this.properties = properties;
- this.envs = new HashMap(envs);
+ this.envs = new HashMap<>(envs);
this.zconf = zconf;
this.containerName = interpreterGroupId.toLowerCase();
- String sparkHome = System.getenv("CONTAINER_SPARK_HOME");
- CONTAINER_SPARK_HOME = (sparkHome == null) ? "/spark" : sparkHome;
-
- String uploadLocalLib = System.getenv("UPLOAD_LOCAL_LIB_TO_CONTAINTER");
- if (null != uploadLocalLib && StringUtils.equals(uploadLocalLib, "false")) {
- uploadLocalLibToContainter = false;
- }
+ containerSparkHome = zconf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME);
+ uploadLocalLibToContainter = zconf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER);
try {
this.zeppelinHome = getZeppelinHome();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
- String defDockerHost = "http://0.0.0.0:2375";
- String dockerHost = System.getenv("DOCKER_HOST");
- DOCKER_HOST = (dockerHost == null) ? defDockerHost : dockerHost;
+ dockerHost = zconf.getString(ConfVars.ZEPPELIN_DOCKER_HOST);
}
@Override
@@ -162,7 +153,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
@Override
public void start(String userName) throws IOException {
- docker = DefaultDockerClient.builder().uri(URI.create(DOCKER_HOST)).build();
+ docker = DefaultDockerClient.builder().uri(URI.create(dockerHost)).build();
removeExistContainer(containerName);
@@ -229,7 +220,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
final ContainerCreation containerCreation
= docker.createContainer(containerConfig, containerName);
- this.containerId = containerCreation.id();
+ String containerId = containerCreation.id();
// Start container
docker.startContainer(containerId);
@@ -238,32 +229,37 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
execInContainer(containerId, dockerCommand, false);
} catch (DockerException e) {
- LOGGER.error(e.getMessage(), e);
- throw new IOException(e.getMessage());
+ throw new IOException(e);
} catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
- throw new IOException(e.getMessage());
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ throw new IOException("Docker preparations were interrupted.", e);
}
long startTime = System.currentTimeMillis();
-
+ long timeoutTime = startTime + getConnectTimeout();
// wait until interpreter send dockerStarted message through thrift rpc
synchronized (dockerStarted) {
- if (!dockerStarted.get()) {
+ while (!dockerStarted.get() && !Thread.currentThread().isInterrupted()) {
+ long timeToTimeout = timeoutTime - System.currentTimeMillis();
+ if (timeToTimeout <= 0) {
+ LOGGER.info("Interpreter docker creation is time out in {} seconds",
+ getConnectTimeout() / 1000);
+ stop();
+ throw new IOException(
+ "Launching zeppelin interpreter on docker is time out, kill it now");
+ }
try {
- dockerStarted.wait(getConnectTimeout());
+ dockerStarted.wait(timeToTimeout);
} catch (InterruptedException e) {
- LOGGER.error("Remote interpreter is not accessible");
- throw new IOException(e.getMessage());
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ stop();
+ throw new IOException("Remote interpreter is not accessible", e);
}
}
}
- if (!dockerStarted.get()) {
- LOGGER.info("Interpreter docker creation is time out in {} seconds",
- getConnectTimeout() / 1000);
- }
-
// waits for interpreter thrift rpc server ready
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
@@ -273,6 +269,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
}
}
}
@@ -285,12 +283,12 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
LOGGER.info("Interpreter container created {}:{}", containerHost, containerPort);
synchronized (dockerStarted) {
dockerStarted.set(true);
- dockerStarted.notify();
+ dockerStarted.notifyAll();
}
}
@VisibleForTesting
- Properties getTemplateBindings() throws IOException {
+ Properties getTemplateBindings() {
Properties dockerProperties = new Properties();
// docker template properties
@@ -312,19 +310,15 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
}
@VisibleForTesting
- List<String> getListEnvs() throws SocketException, UnknownHostException {
+ List<String> getListEnvs() {
// environment variables
envs.put("ZEPPELIN_HOME", zeppelinHome);
envs.put("ZEPPELIN_CONF_DIR", zeppelinHome + "/conf");
envs.put("ZEPPELIN_FORCE_STOP", "true");
- envs.put("SPARK_HOME", this.CONTAINER_SPARK_HOME);
+ envs.put("SPARK_HOME", this.containerSparkHome);
// set container time zone
- String dockerTimeZone = System.getenv("DOCKER_TIME_ZONE");
- if (StringUtils.isBlank(dockerTimeZone)) {
- dockerTimeZone = TimeZone.getDefault().getID();
- }
- envs.put("TZ", dockerTimeZone);
+ envs.put("TZ", zconf.getString(ConfVars.ZEPPELIN_DOCKER_TIME_ZONE));
List<String> listEnv = new ArrayList<>();
for (Map.Entry<String, String> entry : this.envs.entrySet()) {
@@ -344,7 +338,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
return null;
});
} catch (Exception e) {
- LOGGER.warn("ignore the exception when shutting down", e);
+ LOGGER.warn("Ignore the exception when shutting down", e);
}
}
try {
@@ -353,7 +347,11 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
// Remove container
docker.removeContainer(containerName);
- } catch (DockerException | InterruptedException e) {
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ } catch (DockerException e) {
LOGGER.error(e.getMessage(), e);
}
@@ -379,18 +377,26 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
}
}
- if (isExist == true) {
+ if (isExist) {
LOGGER.info("kill exist container {}", containerName);
docker.killContainer(containerName);
}
- } catch (DockerException | InterruptedException e) {
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ } catch (DockerException e) {
LOGGER.error(e.getMessage(), e);
} finally {
try {
- if (isExist == true) {
+ if (isExist) {
docker.removeContainer(containerName);
}
- } catch (DockerException | InterruptedException e) {
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ } catch (DockerException e) {
LOGGER.error(e.getMessage(), e);
}
}
@@ -406,6 +412,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
return containerPort;
}
+ @Override
public boolean isAlive() {
//TODO(ZEPPELIN-5876): Implement it more accurately
return isRunning();
@@ -479,15 +486,14 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
// 3.5) jdbc interpreter properties keytab file
intpKeytab = properties.getProperty("zeppelin.jdbc.keytab.location", "");
}
- if (!StringUtils.isBlank(intpKeytab) && !copyFiles.containsKey(intpKeytab)) {
+ if (!StringUtils.isBlank(intpKeytab)) {
LOGGER.info("intpKeytab : {}", intpKeytab);
- copyFiles.put(intpKeytab, intpKeytab);
+ copyFiles.putIfAbsent(intpKeytab, intpKeytab);
}
// 3.6) zeppelin server keytab file
String zeppelinServerKeytab = zconf.getString(ZEPPELIN_SERVER_KERBEROS_KEYTAB);
- if (!StringUtils.isBlank(zeppelinServerKeytab)
- && !copyFiles.containsKey(zeppelinServerKeytab)) {
- copyFiles.put(zeppelinServerKeytab, zeppelinServerKeytab);
+ if (!StringUtils.isBlank(zeppelinServerKeytab)) {
+ copyFiles.putIfAbsent(zeppelinServerKeytab, zeppelinServerKeytab);
}
// 4) hadoop conf dir
@@ -499,10 +505,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
// 5) spark conf dir
if (envs.containsKey("SPARK_CONF_DIR")) {
String sparkConfDir = envs.get("SPARK_CONF_DIR");
- rmInContainer(containerId, CONTAINER_SPARK_HOME + "/conf");
- mkdirInContainer(containerId, CONTAINER_SPARK_HOME + "/conf");
- copyFiles.put(sparkConfDir, CONTAINER_SPARK_HOME + "/conf");
- envs.put("SPARK_CONF_DIR", CONTAINER_SPARK_HOME + "/conf");
+ rmInContainer(containerId, containerSparkHome + "/conf");
+ mkdirInContainer(containerId, containerSparkHome + "/conf");
+ copyFiles.put(sparkConfDir, containerSparkHome + "/conf");
+ envs.put("SPARK_CONF_DIR", containerSparkHome + "/conf");
}
if (uploadLocalLibToContainter){
@@ -528,9 +534,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
FileFilterUtils.suffixFileFilter("jar"), null);
for (File jarfile : listFiles) {
String jarfilePath = jarfile.getAbsolutePath();
- if (!StringUtils.isBlank(jarfilePath)
- && !copyFiles.containsKey(jarfilePath)) {
- copyFiles.put(jarfilePath, jarfilePath);
+ if (!StringUtils.isBlank(jarfilePath)) {
+ copyFiles.putIfAbsent(jarfilePath, jarfilePath);
}
}
}
@@ -547,19 +552,15 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
String tarFile = file2Tar(copyFiles);
// copy tar to ZEPPELIN_CONTAINER_DIR, auto unzip
- InputStream inputStream = new FileInputStream(tarFile);
- try {
+ try (InputStream inputStream = new FileInputStream(tarFile)) {
docker.copyToContainer(inputStream, containerId, CONTAINER_UPLOAD_TAR_DIR);
- } finally {
- inputStream.close();
}
// copy all files in CONTAINER_UPLOAD_TAR_DIR to the root directory
cpdirInContainer(containerId, CONTAINER_UPLOAD_TAR_DIR + "/*", "/");
// delete tar file in the local
- File fileTar = new File(tarFile);
- fileTar.delete();
+ Files.delete(Paths.get(tarFile));
}
private void mkdirInContainer(String containerId, String path)
@@ -583,7 +584,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
private void execInContainer(String containerId, String execCommand, boolean logout)
throws DockerException, InterruptedException {
- LOGGER.info("exec container commmand: " + execCommand);
+ LOGGER.info("exec container commmand: {}", execCommand);
final String[] command = {"sh", "-c", execCommand};
final ExecCreation execCreation = docker.execCreate(
@@ -622,15 +623,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
}
private String getZeppelinHome() throws IOException {
- String zeppelinHome = zconf.getZeppelinHome();
- if (System.getenv("ZEPPELIN_HOME") != null) {
- zeppelinHome = System.getenv("ZEPPELIN_HOME");
- }
-
// check zeppelinHome is exist
- File fileZeppelinHome = new File(zeppelinHome);
+ File fileZeppelinHome = new File(zconf.getZeppelinHome());
if (fileZeppelinHome.exists() && fileZeppelinHome.isDirectory()) {
- return zeppelinHome;
+ return zconf.getZeppelinHome();
}
throw new IOException("Can't find zeppelin home path!");
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java
index 343f3eed11..6a8b7f92f4 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java
@@ -32,39 +32,46 @@ import java.io.IOException;
import java.util.List;
public class TarUtils {
+
+ private TarUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(TarUtils.class);
public static void compress(String name, List<TarFileEntry> files) throws IOException {
- try (TarArchiveOutputStream out = getTarArchiveOutputStream(name)){
- for (TarFileEntry tarFileEntry : files){
- addToArchiveCompression(out, tarFileEntry.getFile(), tarFileEntry.getArchivePath());
+ try (FileOutputStream fileOutputStream = new FileOutputStream(name)) {
+ try (TarArchiveOutputStream out = getTarArchiveOutputStream(fileOutputStream)) {
+ for (TarFileEntry tarFileEntry : files) {
+ addToArchiveCompression(out, tarFileEntry.getFile(), tarFileEntry.getArchivePath());
+ }
}
}
}
public static void decompress(String in, File out) throws IOException {
- FileInputStream fileInputStream = new FileInputStream(in);
- GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fileInputStream);
+ try (FileInputStream fileInputStream = new FileInputStream(in)) {
+ GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fileInputStream);
- try (TarArchiveInputStream fin = new TarArchiveInputStream(gzipInputStream)){
- TarArchiveEntry entry;
- while ((entry = fin.getNextTarEntry()) != null) {
- if (entry.isDirectory()) {
- continue;
- }
- File curfile = new File(out, entry.getName());
- File parent = curfile.getParentFile();
- if (!parent.exists()) {
- parent.mkdirs();
+ try (TarArchiveInputStream fin = new TarArchiveInputStream(gzipInputStream)) {
+ TarArchiveEntry entry;
+ while ((entry = fin.getNextTarEntry()) != null) {
+ if (entry.isDirectory()) {
+ continue;
+ }
+ File curfile = new File(out, entry.getName());
+ File parent = curfile.getParentFile();
+ if (!parent.exists()) {
+ parent.mkdirs();
+ }
+ IOUtils.copy(fin, new FileOutputStream(curfile));
}
- IOUtils.copy(fin, new FileOutputStream(curfile));
}
}
}
- private static TarArchiveOutputStream getTarArchiveOutputStream(String name)
+ private static TarArchiveOutputStream getTarArchiveOutputStream(FileOutputStream fileOutputStream)
throws IOException {
- FileOutputStream fileOutputStream = new FileOutputStream(name);
GzipCompressorOutputStream gzipOutputStream = new GzipCompressorOutputStream(fileOutputStream);
TarArchiveOutputStream taos = new TarArchiveOutputStream(gzipOutputStream);
@@ -82,7 +89,7 @@ public class TarUtils {
throws IOException {
if (file.isFile()){
String archivePath = "." + dir;
- LOGGER.info("archivePath = " + archivePath);
+ LOGGER.info("archivePath = {}", archivePath);
out.putArchiveEntry(new TarArchiveEntry(file, archivePath));
try (FileInputStream in = new FileInputStream(file)) {
IOUtils.copy(in, out);
@@ -97,7 +104,7 @@ public class TarUtils {
}
}
} else {
- LOGGER.error(file.getName() + " is not supported");
+ LOGGER.error("{} is not supported", file.getName());
}
}
}
diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
index 4a7a39de6d..6b33b86afa 100644
--- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
@@ -17,36 +17,29 @@
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({System.class, DockerInterpreterProcess.class})
-@PowerMockIgnore( {"javax.management.*"})
-public class DockerInterpreterProcessTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcessTest.class);
+class DockerInterpreterProcessTest {
- protected static ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+ protected static ZeppelinConfiguration zconf = spy(ZeppelinConfiguration.create());
@Test
- public void testCreateIntpProcess() throws IOException {
+ void testCreateIntpProcess() throws IOException {
DockerInterpreterLauncher launcher
= new DockerInterpreterLauncher(zconf, null);
Properties properties = new Properties();
@@ -62,44 +55,42 @@ public class DockerInterpreterProcessTest {
DockerInterpreterProcess interpreterProcess = (DockerInterpreterProcess) client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
- assertEquals(interpreterProcess.CONTAINER_SPARK_HOME, "/spark");
- assertEquals(interpreterProcess.uploadLocalLibToContainter, true);
- assertNotEquals(interpreterProcess.DOCKER_HOST, "http://my-docker-host:2375");
+ assertEquals("/spark", interpreterProcess.containerSparkHome);
+ assertTrue(interpreterProcess.uploadLocalLibToContainter);
+ assertNotEquals("http://my-docker-host:2375", interpreterProcess.dockerHost);
}
@Test
- public void testEnv() throws IOException {
- PowerMockito.mockStatic(System.class);
- PowerMockito.when(System.getenv("CONTAINER_SPARK_HOME")).thenReturn("my-spark-home");
- PowerMockito.when(System.getenv("UPLOAD_LOCAL_LIB_TO_CONTAINTER")).thenReturn("false");
- PowerMockito.when(System.getenv("DOCKER_HOST")).thenReturn("http://my-docker-host:2375");
+ void testEnv() throws IOException {
+ when(zconf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME)).thenReturn("my-spark-home");
+ when(zconf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER)).thenReturn(false);
+ when(zconf.getString(ConfVars.ZEPPELIN_DOCKER_HOST)).thenReturn("http://my-docker-host:2375");
Properties properties = new Properties();
properties.setProperty(
- ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
-
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("MY_ENV1", "V1");
- DockerInterpreterProcess intp = new DockerInterpreterProcess(
- zconf,
- "interpreter-container:1.0",
- "shared_process",
- "sh",
- "shell",
- properties,
- envs,
- "zeppelin.server.hostname",
- 12320,
- 5000, 10);
-
- assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home");
- assertEquals(intp.uploadLocalLibToContainter, false);
- assertEquals(intp.DOCKER_HOST, "http://my-docker-host:2375");
+ DockerInterpreterProcess intp = spy(new DockerInterpreterProcess(
+ zconf,
+ "interpreter-container:1.0",
+ "shared_process",
+ "sh",
+ "shell",
+ properties,
+ envs,
+ "zeppelin.server.hostname",
+ 12320,
+ 5000, 10));
+
+ assertEquals("my-spark-home", intp.containerSparkHome);
+ assertFalse(intp.uploadLocalLibToContainter);
+ assertEquals("http://my-docker-host:2375", intp.dockerHost);
}
@Test
- public void testTemplateBindings() throws IOException {
+ void testTemplateBindings() throws IOException {
Properties properties = new Properties();
properties.setProperty(
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
@@ -120,28 +111,28 @@ public class DockerInterpreterProcessTest {
5000, 10);
Properties dockerProperties = intp.getTemplateBindings();
- assertEquals(dockerProperties.size(), 10);
-
- assertTrue(null != dockerProperties.get("CONTAINER_ZEPPELIN_HOME"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.container.image"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.group.id"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.group.name"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.setting.name"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.localRepo"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.rpc.portRange"));
- assertTrue(null != dockerProperties.get("zeppelin.server.rpc.host"));
- assertTrue(null != dockerProperties.get("zeppelin.server.rpc.portRange"));
- assertTrue(null != dockerProperties.get("zeppelin.interpreter.connect.timeout"));
+ assertEquals(10, dockerProperties.size());
+
+ assertNotNull(dockerProperties.get("CONTAINER_ZEPPELIN_HOME"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.container.image"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.group.id"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.group.name"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.setting.name"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.localRepo"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.rpc.portRange"));
+ assertNotNull(dockerProperties.get("zeppelin.server.rpc.host"));
+ assertNotNull(dockerProperties.get("zeppelin.server.rpc.portRange"));
+ assertNotNull(dockerProperties.get("zeppelin.interpreter.connect.timeout"));
List<String> listEnvs = intp.getListEnvs();
- assertEquals(listEnvs.size(), 6);
+ assertEquals(6, listEnvs.size());
Map<String, String> mapEnv = new HashMap<>();
for (int i = 0; i < listEnvs.size(); i++) {
String env = listEnvs.get(i);
String kv[] = env.split("=");
mapEnv.put(kv[0], kv[1]);
}
- assertEquals(mapEnv.size(), 6);
+ assertEquals(6, mapEnv.size());
assertTrue(mapEnv.containsKey("ZEPPELIN_HOME"));
assertTrue(mapEnv.containsKey("ZEPPELIN_CONF_DIR"));
assertTrue(mapEnv.containsKey("ZEPPELIN_FORCE_STOP"));