You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/05/05 07:33:49 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5353] Fix K8s tests
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 8157fcd [ZEPPELIN-5353] Fix K8s tests
8157fcd is described below
commit 8157fcd535ccc82153699daad2b5666aedf0cc5f
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue May 4 09:28:43 2021 +0200
[ZEPPELIN-5353] Fix K8s tests
### What is this PR for?
This PR updates the K8s Java library and fixes the failing k8s unit tests.
### What type of PR is it?
- Hot Fix
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5353
### How should this be tested?
* CI
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #4107 from Reamer/ci_k8s and squashes the following commits:
af5943262 [Philipp Dallig] Resolve flapping tests in a shared CI environment
827dbfe60 [Philipp Dallig] Fix for the javax.net.ssl.SSLException that occurs with the disabling of TLS1.0 and TLS1.1 by JDK 8u292. See JDK-8258598
c1e37376a [Philipp Dallig] Update kubernetes.client.version to 5.3.1
a9063765c [Philipp Dallig] Some test cleanup
(cherry picked from commit 91bfbf9db1b2ca915644c264694e0add03a69e61)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
zeppelin-plugins/launcher/k8s-standard/pom.xml | 2 +-
.../launcher/K8sRemoteInterpreterProcess.java | 2 +-
.../interpreter/launcher/PodPhaseWatcher.java | 10 ++++-
.../launcher/K8sRemoteInterpreterProcessTest.java | 48 +++++++++++-----------
.../interpreter/launcher/PodPhaseWatcherTest.java | 2 +-
.../test/resources/k8s-specs/interpreter-spec.yaml | 15 -------
.../src/test/resources/log4j.properties | 31 ++++++++++++++
.../org/apache/zeppelin/notebook/NotebookTest.java | 8 ++--
8 files changed, 72 insertions(+), 46 deletions(-)
diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml
index e0d1d2d..34df72e 100644
--- a/zeppelin-plugins/launcher/k8s-standard/pom.xml
+++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml
@@ -37,7 +37,7 @@
<properties>
<plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name>
- <kubernetes.client.version>4.10.2</kubernetes.client.version>
+ <kubernetes.client.version>5.3.1</kubernetes.client.version>
<jinjava.version>2.5.4</jinjava.version>
</properties>
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index e44a5af..80a6f15 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -257,7 +257,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
specTemplate.loadProperties(templateProperties);
String template = specTemplate.render(path);
- ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
+ ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size());
LOGGER.debug(template);
if (delete) {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
index ddf23f5..fe2d1ab 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
-import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
public class PodPhaseWatcher implements Watcher<Pod> {
private static final Logger LOGGER = LoggerFactory.getLogger(PodPhaseWatcher.class);
@@ -49,7 +49,7 @@ public class PodPhaseWatcher implements Watcher<Pod> {
}
@Override
- public void onClose(KubernetesClientException cause) {
+ public void onClose(WatcherException cause) {
if (cause != null) {
LOGGER.error("PodWatcher exits abnormally", cause);
}
@@ -57,6 +57,12 @@ public class PodPhaseWatcher implements Watcher<Pod> {
countDownLatch.countDown();
}
+ @Override
+ public void onClose() {
+ // always count down, so threads that are waiting will continue
+ countDownLatch.countDown();
+ }
+
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 9414411..dbc4555 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.net.URL;
import java.time.Instant;
import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -45,13 +46,13 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
public class K8sRemoteInterpreterProcessTest {
@Rule
- public KubernetesServer server = new KubernetesServer(true, true);
+ public KubernetesServer server = new KubernetesServer(false, true);
@Test
public void testPredefinedPortNumbers() {
// given
Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
server.getClient(),
@@ -81,11 +82,11 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testGetTemplateBindings() throws IOException {
+ public void testGetTemplateBindings() {
// given
Properties properties = new Properties();
properties.put("my.key1", "v1");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("MY_ENV1", "V1");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -118,7 +119,7 @@ public class K8sRemoteInterpreterProcessTest {
assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id"));
assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name"));
assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name"));
- assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo"));
+ assertTrue(p.containsKey("zeppelin.k8s.interpreter.localRepo"));
assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange"));
assertEquals("zeppelin.server.service" , p.get("zeppelin.k8s.server.rpc.service"));
assertEquals(12320 , p.get("zeppelin.k8s.server.rpc.portRange"));
@@ -131,12 +132,12 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testGetTemplateBindingsForSpark() throws IOException {
+ public void testGetTemplateBindingsForSpark() {
// given
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("MY_ENV1", "V1");
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
envs.put("SERVICE_DOMAIN", "mydomain");
@@ -183,12 +184,12 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
+ public void testGetTemplateBindingsForSparkWithProxyUser() {
// given
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("MY_ENV1", "V1");
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
envs.put("SERVICE_DOMAIN", "mydomain");
@@ -234,12 +235,12 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
+ public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() {
// given
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("MY_ENV1", "V1");
envs.put("SPARK_SUBMIT_OPTIONS", "my options");
envs.put("SERVICE_DOMAIN", "mydomain");
@@ -281,7 +282,7 @@ public class K8sRemoteInterpreterProcessTest {
public void testSparkUiWebUrlTemplate() {
// given
Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -326,7 +327,7 @@ public class K8sRemoteInterpreterProcessTest {
Properties properties = new Properties();
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.cores", "1");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -363,7 +364,7 @@ public class K8sRemoteInterpreterProcessTest {
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.memoryOverhead", "256m");
properties.put("spark.driver.cores", "5");
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -394,10 +395,10 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testK8sStartSuccessful() throws IOException, InterruptedException {
+ public void testK8sStartSuccessful() throws IOException {
// given
Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
URL url = Thread.currentThread().getContextClassLoader()
.getResource("k8s-specs/interpreter-spec.yaml");
@@ -430,10 +431,10 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testK8sStartFailed() throws IOException, InterruptedException {
+ public void testK8sStartFailed() {
// given
Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
URL url = Thread.currentThread().getContextClassLoader()
.getResource("k8s-specs/interpreter-spec.yaml");
@@ -459,7 +460,7 @@ public class K8sRemoteInterpreterProcessTest {
true);
PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
podStatusSimulator.setSecondPhase("Failed");
- podStatusSimulator.setSuccessfullStart(false);
+ podStatusSimulator.setSuccessfulStart(false);
ExecutorService service = Executors.newFixedThreadPool(1);
service
.submit(podStatusSimulator);
@@ -477,10 +478,10 @@ public class K8sRemoteInterpreterProcessTest {
}
@Test
- public void testK8sStartTimeoutPending() throws IOException, InterruptedException {
+ public void testK8sStartTimeoutPending() throws InterruptedException {
// given
Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
+ Map<String, String> envs = new HashMap<>();
envs.put("SERVICE_DOMAIN", "mydomain");
URL url = Thread.currentThread().getContextClassLoader()
.getResource("k8s-specs/interpreter-spec.yaml");
@@ -507,7 +508,7 @@ public class K8sRemoteInterpreterProcessTest {
PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
podStatusSimulator.setFirstPhase("Pending");
podStatusSimulator.setSecondPhase("Pending");
- podStatusSimulator.setSuccessfullStart(false);
+ podStatusSimulator.setSuccessfulStart(false);
ExecutorService service = Executors.newFixedThreadPool(2);
service
.submit(podStatusSimulator);
@@ -558,7 +559,7 @@ public class K8sRemoteInterpreterProcessTest {
public void setSecondPhase(String phase) {
this.secondPhase = phase;
}
- public void setSuccessfullStart(boolean successful) {
+ public void setSuccessfulStart(boolean successful) {
this.successfulStart = successful;
}
@@ -590,6 +591,7 @@ public class K8sRemoteInterpreterProcessTest {
}
}
} catch (InterruptedException e) {
+ // Do nothing
}
}
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
index fb06768..aa08352 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
@@ -38,7 +38,7 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
public class PodPhaseWatcherTest {
@Rule
- public KubernetesServer server = new KubernetesServer(true, true);
+ public KubernetesServer server = new KubernetesServer(false, true);
@Test
public void testPhase() throws InterruptedException {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
index 116b0df..94717c2 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
@@ -73,21 +73,6 @@ spec:
limits:
cpu: "{{zeppelin.k8s.interpreter.cores}}"
{% endif %}
- {% if zeppelin.k8s.interpreter.group.name == "spark" %}
- volumeMounts:
- - name: spark-home
- mountPath: /spark
- initContainers:
- - name: spark-home-init
- image: {{zeppelin.k8s.spark.container.image}}
- command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
- volumeMounts:
- - name: spark-home
- mountPath: /spark
- volumes:
- - name: spark-home
- emptyDir: {}
- {% endif %}
---
kind: Service
apiVersion: v1
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c846ba5
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=
+#%5p [%t] (%F:%L) - %m%n
+#%-4r [%t] %-5p %c %x - %m%n
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+#log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.io.fabric8.kubernetes.client.Config=INFO, stdout
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index c0980b4..c37b2e4 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -591,7 +591,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
public void testSchedulePoolUsage() throws InterruptedException, IOException {
final int timeout = 30;
final String everySecondCron = "* * * * * ?";
- final CountDownLatch jobsToExecuteCount = new CountDownLatch(8);
+ // each run starts a new JVM and the job takes about ~5 seconds
+ final CountDownLatch jobsToExecuteCount = new CountDownLatch(5);
final Note note = notebook.createNote("note1", anonymous);
executeNewParagraphByCron(note, everySecondCron);
@@ -655,8 +656,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System");
try {
- final int timeout = 20;
+ final int timeout = 30;
final String everySecondCron = "* * * * * ?";
+ // each run starts a new JVM and the job takes about ~5 seconds
final CountDownLatch jobsToExecuteCount = new CountDownLatch(5);
final Note note = notebook.createNote("note1", anonymous);
@@ -964,7 +966,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
assertNull(registry.get("o1", note.getId(), null));
assertNull(registry.get("o2", note.getId(), p1.getId()));
- // global object sould be remained
+ // global object should be remained
assertNotNull(registry.get("o3", null, null));
}