You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/03/17 23:58:34 UTC

[zeppelin] branch master updated: [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aad7d9  [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)
7aad7d9 is described below

commit 7aad7d92d255cf19a56272b2d3ac40f47373a7ec
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Mar 18 07:58:25 2022 +0800

    [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)
---
 .../apache/zeppelin/spark/SparkInterpreter.java    | 57 +++++++++++++---------
 .../zeppelin/spark/SparkInterpreterTest.java       |  5 +-
 .../zeppelin/spark/SparkScala211Interpreter.scala  |  1 +
 .../zeppelin/spark/SparkScala212Interpreter.scala  |  1 +
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala |  3 +-
 .../remote/RemoteInterpreterServer.java            | 19 +++++++-
 .../remote/RemoteInterpreterServerTest.java        | 32 +++++++++++-
 7 files changed, 91 insertions(+), 27 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index fef0998..b13d67f 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -66,6 +66,7 @@ public class SparkInterpreter extends AbstractInterpreter {
   }
 
   private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
+  private static Class innerInterpreterClazz;
   private AbstractSparkScalaInterpreter innerInterpreter;
   private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private SparkContext sc;
@@ -151,38 +152,45 @@ public class SparkInterpreter extends AbstractInterpreter {
    */
   private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) throws Exception {
     scalaVersion = extractScalaVersion(conf);
-    ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader();
-
-    String zeppelinHome = System.getenv("ZEPPELIN_HOME");
-    if (zeppelinHome != null) {
-      // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader.
-      // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version>
-
-      File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
-      List<URL> urls = new ArrayList<>();
-      for (File file : scalaJarFolder.listFiles()) {
-        LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
-                + scalaJarFolder);
-        urls.add(file.toURI().toURL());
+    // Make sure the innerInterpreter Class is loaded only once into JVM
+    // Use double lock to ensure thread safety
+    if (innerInterpreterClazz == null) {
+      synchronized (SparkInterpreter.class) {
+        if (innerInterpreterClazz == null) {
+          LOGGER.debug("innerInterpreterClazz is null, thread:{}", Thread.currentThread().getName());
+          ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader();
+          String zeppelinHome = System.getenv("ZEPPELIN_HOME");
+          if (zeppelinHome != null) {
+            // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader.
+            // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version>
+            File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
+            List<URL> urls = new ArrayList<>();
+            for (File file : scalaJarFolder.listFiles()) {
+              LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
+                      + scalaJarFolder);
+              urls.add(file.toURI().toURL());
+            }
+            scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
+                    Thread.currentThread().getContextClassLoader());
+          }
+          String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+          innerInterpreterClazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
+        }
       }
-      scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
-              Thread.currentThread().getContextClassLoader());
     }
-
-    String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
-    Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
     return (AbstractSparkScalaInterpreter)
-            clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
-                    .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir);
+            innerInterpreterClazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
+                    .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir);
   }
 
-  @Override
+    @Override
   public void close() throws InterpreterException {
     LOGGER.info("Close SparkInterpreter");
     if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
       innerInterpreter.close();
-      innerInterpreter = null;
+      innerInterpreterClazz = null;
     }
+      innerInterpreter = null;
   }
 
   @Override
@@ -306,4 +314,9 @@ public class SparkInterpreter extends AbstractInterpreter {
   public boolean isUnsupportedSparkVersion() {
     return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
   }
+
+  public AbstractSparkScalaInterpreter getInnerInterpreter() {
+    return innerInterpreter;
+  }
+
 }
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 7b3d1df..297b55d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -544,7 +544,7 @@ public class SparkInterpreterTest {
   }
 
   @Test
-  public void testScopedMode() throws InterpreterException {
+  public void testScopedMode() throws Exception {
     Properties properties = new Properties();
     properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local");
     properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test");
@@ -567,6 +567,9 @@ public class SparkInterpreterTest {
     interpreter1.open();
     interpreter2.open();
 
+    // check if there is any duplicated loaded class
+    assertEquals(true, interpreter1.getInnerInterpreter().getClass()==interpreter2.getInnerInterpreter().getClass());
+
     InterpreterContext context = getInterpreterContext();
 
     InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", context);
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 41470f1..d1a3b08 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -120,6 +120,7 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     super.close()
     if (sparkILoop != null) {
       sparkILoop.closeInterpreter()
+      sparkILoop = null
     }
   }
 
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index e9c127d..4918e4b 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -111,6 +111,7 @@ class SparkScala212Interpreter(override val conf: SparkConf,
     super.close()
     if (sparkILoop != null) {
       sparkILoop.closeInterpreter()
+      sparkILoop = null
     }
   }
 
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index f984e15..46100da 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -202,13 +202,14 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     }
     if (sc != null) {
       sc.stop()
+      sc = null
     }
-    sc = null
     if (sparkSession != null) {
       sparkSession.getClass.getMethod("stop").invoke(sparkSession)
       sparkSession = null
     }
     sqlContext = null
+    z = null
   }
 
   private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index aff8139..6b499fa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -95,6 +95,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.Optional;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
@@ -379,6 +380,13 @@ public class RemoteInterpreterServer extends Thread
                 Integer.parseInt(properties.getOrDefault("zeppelin.interpreter.result.cache", "0"));
       }
 
+      boolean isPresent = Optional.ofNullable(interpreterGroup.get(sessionId)).orElse(new ArrayList<>()).stream()
+              .filter(m -> m.getClassName().equals(className)).findAny().isPresent();
+      if (isPresent) {
+        LOGGER.info("interpreter {} is existing", className);
+        return;
+      }
+
       Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
       Properties p = new Properties();
       p.putAll(properties);
@@ -482,9 +490,18 @@ public class RemoteInterpreterServer extends Thread
           Iterator<Interpreter> it = interpreters.iterator();
           while (it.hasNext()) {
             Interpreter inp = it.next();
-            if (inp.getClassName().equals(className)) {
+            boolean isOpen = false;
+            if (inp instanceof LazyOpenInterpreter) {
+              LazyOpenInterpreter lazy = (LazyOpenInterpreter) inp;
+              isOpen = lazy.isOpen();
+            }
+            // only remove the open and matched interpreter
+            if (inp.getClassName().equals(className) && isOpen) {
               try {
+                LOGGER.debug("Trying to close interpreter {} with scheduler thread{}", inp.getClassName(), inp.getScheduler().getName());
                 inp.close();
+                // close the thread
+                inp.getScheduler().stop();
               } catch (InterpreterException e) {
                 LOGGER.warn("Fail to close interpreter", e);
               }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 20be866..7f6e0e5 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
@@ -119,8 +120,13 @@ public class RemoteInterpreterServerTest {
     assertEquals(2, interpreter1.getProperties().size());
     assertEquals("value_1", interpreter1.getProperty("property_1"));
 
-    // create Test2Interpreter in session_1
+    // create duplicated Test1Interpreter in session_1
     server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+            intpProperties, "user_1");
+    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+    // create Test2Interpreter in session_1
+    server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
         intpProperties, "user_1");
     assertEquals(2, server.getInterpreterGroup().get("session_1").size());
 
@@ -188,9 +194,31 @@ public class RemoteInterpreterServerTest {
     assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(),
         intpContext));
 
-    // close
+    // before close -> thread of Test1Interpreter is running
+    assertEquals(true, isThreadRunning(interpreter1.getScheduler().getName()));
+
+    // close opened Test1Interpreter -> remove from interpreterGroup
     server.close("session_1", Test1Interpreter.class.getName());
     assertTrue(interpreter1.closed.get());
+    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+    // close unopened Test2Interpreter -> keep in interpreterGroup
+    server.close("session_1", Test2Interpreter.class.getName());
+    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+    // after close -> thread of Test1Interpreter is not running
+    assertEquals(false, isThreadRunning(interpreter1.getScheduler().getName()));
+  }
+
+  private boolean isThreadRunning(String schedulerName) {
+    boolean res = false;
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    for (Thread t : threads) {
+      if (!t.getName().contains(schedulerName)) continue;
+      res = true;
+      break;
+    }
+    return res;
   }
 
   public static class Test1Interpreter extends Interpreter {