You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/05/05 07:33:49 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5353] Fix K8s tests

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 8157fcd  [ZEPPELIN-5353] Fix K8s tests
8157fcd is described below

commit 8157fcd535ccc82153699daad2b5666aedf0cc5f
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue May 4 09:28:43 2021 +0200

    [ZEPPELIN-5353] Fix K8s tests
    
    ### What is this PR for?
    This PR updates the K8s Java library and fixes the failing k8s unit tests.
    
    ### What type of PR is it?
     - Hot Fix
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5353
    
    ### How should this be tested?
    * CI
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #4107 from Reamer/ci_k8s and squashes the following commits:
    
    af5943262 [Philipp Dallig] Resolve flapping tests in a shared CI environment
    827dbfe60 [Philipp Dallig] Fix for the javax.net.ssl.SSLException that occurs with the disabling of TLS1.0 and TLS1.1 by JDK 8u292. See JDK-8258598
    c1e37376a [Philipp Dallig] Update kubernetes.client.version to 5.3.1
    a9063765c [Philipp Dallig] Some test cleanup
    
    (cherry picked from commit 91bfbf9db1b2ca915644c264694e0add03a69e61)
    Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
 zeppelin-plugins/launcher/k8s-standard/pom.xml     |  2 +-
 .../launcher/K8sRemoteInterpreterProcess.java      |  2 +-
 .../interpreter/launcher/PodPhaseWatcher.java      | 10 ++++-
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 48 +++++++++++-----------
 .../interpreter/launcher/PodPhaseWatcherTest.java  |  2 +-
 .../test/resources/k8s-specs/interpreter-spec.yaml | 15 -------
 .../src/test/resources/log4j.properties            | 31 ++++++++++++++
 .../org/apache/zeppelin/notebook/NotebookTest.java |  8 ++--
 8 files changed, 72 insertions(+), 46 deletions(-)

diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml
index e0d1d2d..34df72e 100644
--- a/zeppelin-plugins/launcher/k8s-standard/pom.xml
+++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name>
-        <kubernetes.client.version>4.10.2</kubernetes.client.version>
+        <kubernetes.client.version>5.3.1</kubernetes.client.version>
         <jinjava.version>2.5.4</jinjava.version>
     </properties>
 
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index e44a5af..80a6f15 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -257,7 +257,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
       K8sSpecTemplate specTemplate = new K8sSpecTemplate();
       specTemplate.loadProperties(templateProperties);
       String template = specTemplate.render(path);
-      ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
+      ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
       LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size());
       LOGGER.debug(template);
       if (delete) {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
index ddf23f5..fe2d1ab 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
 
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodStatus;
-import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
 
 public class PodPhaseWatcher implements Watcher<Pod> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PodPhaseWatcher.class);
@@ -49,7 +49,7 @@ public class PodPhaseWatcher implements Watcher<Pod> {
   }
 
   @Override
-  public void onClose(KubernetesClientException cause) {
+  public void onClose(WatcherException cause) {
     if (cause != null) {
       LOGGER.error("PodWatcher exits abnormally", cause);
     }
@@ -57,6 +57,12 @@ public class PodPhaseWatcher implements Watcher<Pod> {
     countDownLatch.countDown();
   }
 
+  @Override
+  public void onClose() {
+    // always count down, so threads that are waiting will continue
+    countDownLatch.countDown();
+  }
+
   public CountDownLatch getCountDownLatch() {
     return countDownLatch;
   }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 9414411..dbc4555 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -45,13 +46,13 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
 public class K8sRemoteInterpreterProcessTest {
 
   @Rule
-  public KubernetesServer server = new KubernetesServer(true, true);
+  public KubernetesServer server = new KubernetesServer(false, true);
 
   @Test
   public void testPredefinedPortNumbers() {
     // given
     Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
         server.getClient(),
@@ -81,11 +82,11 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testGetTemplateBindings() throws IOException {
+  public void testGetTemplateBindings() {
     // given
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("MY_ENV1", "V1");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -118,7 +119,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id"));
     assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name"));
     assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name"));
-    assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo"));
+    assertTrue(p.containsKey("zeppelin.k8s.interpreter.localRepo"));
     assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange"));
     assertEquals("zeppelin.server.service" , p.get("zeppelin.k8s.server.rpc.service"));
     assertEquals(12320 , p.get("zeppelin.k8s.server.rpc.portRange"));
@@ -131,12 +132,12 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testGetTemplateBindingsForSpark() throws IOException {
+  public void testGetTemplateBindingsForSpark() {
     // given
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("MY_ENV1", "V1");
     envs.put("SPARK_SUBMIT_OPTIONS", "my options");
     envs.put("SERVICE_DOMAIN", "mydomain");
@@ -183,12 +184,12 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
+  public void testGetTemplateBindingsForSparkWithProxyUser() {
     // given
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("MY_ENV1", "V1");
     envs.put("SPARK_SUBMIT_OPTIONS", "my options");
     envs.put("SERVICE_DOMAIN", "mydomain");
@@ -234,12 +235,12 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
+  public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() {
     // given
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("MY_ENV1", "V1");
     envs.put("SPARK_SUBMIT_OPTIONS", "my options");
     envs.put("SERVICE_DOMAIN", "mydomain");
@@ -281,7 +282,7 @@ public class K8sRemoteInterpreterProcessTest {
   public void testSparkUiWebUrlTemplate() {
     // given
     Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -326,7 +327,7 @@ public class K8sRemoteInterpreterProcessTest {
     Properties properties = new Properties();
     properties.put("spark.driver.memory", "1g");
     properties.put("spark.driver.cores", "1");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -363,7 +364,7 @@ public class K8sRemoteInterpreterProcessTest {
     properties.put("spark.driver.memory", "1g");
     properties.put("spark.driver.memoryOverhead", "256m");
     properties.put("spark.driver.cores", "5");
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
@@ -394,10 +395,10 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testK8sStartSuccessful() throws IOException, InterruptedException {
+  public void testK8sStartSuccessful() throws IOException {
     // given
     Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
     URL url = Thread.currentThread().getContextClassLoader()
         .getResource("k8s-specs/interpreter-spec.yaml");
@@ -430,10 +431,10 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testK8sStartFailed() throws IOException, InterruptedException {
+  public void testK8sStartFailed() {
     // given
     Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
     URL url = Thread.currentThread().getContextClassLoader()
         .getResource("k8s-specs/interpreter-spec.yaml");
@@ -459,7 +460,7 @@ public class K8sRemoteInterpreterProcessTest {
         true);
     PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
     podStatusSimulator.setSecondPhase("Failed");
-    podStatusSimulator.setSuccessfullStart(false);
+    podStatusSimulator.setSuccessfulStart(false);
     ExecutorService service = Executors.newFixedThreadPool(1);
     service
         .submit(podStatusSimulator);
@@ -477,10 +478,10 @@ public class K8sRemoteInterpreterProcessTest {
   }
 
   @Test
-  public void testK8sStartTimeoutPending() throws IOException, InterruptedException {
+  public void testK8sStartTimeoutPending() throws InterruptedException {
     // given
     Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
+    Map<String, String> envs = new HashMap<>();
     envs.put("SERVICE_DOMAIN", "mydomain");
     URL url = Thread.currentThread().getContextClassLoader()
         .getResource("k8s-specs/interpreter-spec.yaml");
@@ -507,7 +508,7 @@ public class K8sRemoteInterpreterProcessTest {
     PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
     podStatusSimulator.setFirstPhase("Pending");
     podStatusSimulator.setSecondPhase("Pending");
-    podStatusSimulator.setSuccessfullStart(false);
+    podStatusSimulator.setSuccessfulStart(false);
     ExecutorService service = Executors.newFixedThreadPool(2);
     service
         .submit(podStatusSimulator);
@@ -558,7 +559,7 @@ public class K8sRemoteInterpreterProcessTest {
     public void setSecondPhase(String phase) {
       this.secondPhase = phase;
     }
-    public void setSuccessfullStart(boolean successful) {
+    public void setSuccessfulStart(boolean successful) {
       this.successfulStart = successful;
     }
 
@@ -590,6 +591,7 @@ public class K8sRemoteInterpreterProcessTest {
           }
         }
       } catch (InterruptedException e) {
+        // Do nothing
       }
     }
   }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
index fb06768..aa08352 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
@@ -38,7 +38,7 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
 public class PodPhaseWatcherTest {
 
   @Rule
-  public KubernetesServer server = new KubernetesServer(true, true);
+  public KubernetesServer server = new KubernetesServer(false, true);
 
   @Test
   public void testPhase() throws InterruptedException {
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
index 116b0df..94717c2 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
@@ -73,21 +73,6 @@ spec:
       limits:
         cpu: "{{zeppelin.k8s.interpreter.cores}}"
   {% endif %}
-  {% if zeppelin.k8s.interpreter.group.name == "spark" %}
-    volumeMounts:
-    - name: spark-home
-      mountPath: /spark
-  initContainers:
-  - name: spark-home-init
-    image: {{zeppelin.k8s.spark.container.image}}
-    command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
-    volumeMounts:
-    - name: spark-home
-      mountPath: /spark
-  volumes:
-  - name: spark-home
-    emptyDir: {}
-  {% endif %}
 ---
 kind: Service
 apiVersion: v1
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c846ba5
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=
+#%5p [%t] (%F:%L) - %m%n
+#%-4r [%t] %-5p %c %x - %m%n
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+#log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.io.fabric8.kubernetes.client.Config=INFO, stdout
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index c0980b4..c37b2e4 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -591,7 +591,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
   public void testSchedulePoolUsage() throws InterruptedException, IOException {
     final int timeout = 30;
     final String everySecondCron = "* * * * * ?";
-    final CountDownLatch jobsToExecuteCount = new CountDownLatch(8);
+    // each run starts a new JVM and the job takes about ~5 seconds
+    final CountDownLatch jobsToExecuteCount = new CountDownLatch(5);
     final Note note = notebook.createNote("note1", anonymous);
 
     executeNewParagraphByCron(note, everySecondCron);
@@ -655,8 +656,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
 
     System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System");
     try {
-      final int timeout = 20;
+      final int timeout = 30;
       final String everySecondCron = "* * * * * ?";
+      // each run starts a new JVM and the job takes about ~5 seconds
       final CountDownLatch jobsToExecuteCount = new CountDownLatch(5);
       final Note note = notebook.createNote("note1", anonymous);
 
@@ -964,7 +966,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
     assertNull(registry.get("o1", note.getId(), null));
     assertNull(registry.get("o2", note.getId(), p1.getId()));
 
-    // global object sould be remained
+    // global object should be remained
     assertNotNull(registry.get("o3", null, null));
   }