You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/03/17 23:58:34 UTC
[zeppelin] branch master updated: [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 7aad7d9 [ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)
7aad7d9 is described below
commit 7aad7d92d255cf19a56272b2d3ac40f47373a7ec
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Mar 18 07:58:25 2022 +0800
[ZEPPELIN-5644] Reduce redundancy during the interpreter execution process (#4294)
---
.../apache/zeppelin/spark/SparkInterpreter.java | 57 +++++++++++++---------
.../zeppelin/spark/SparkInterpreterTest.java | 5 +-
.../zeppelin/spark/SparkScala211Interpreter.scala | 1 +
.../zeppelin/spark/SparkScala212Interpreter.scala | 1 +
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 3 +-
.../remote/RemoteInterpreterServer.java | 19 +++++++-
.../remote/RemoteInterpreterServerTest.java | 32 +++++++++++-
7 files changed, 91 insertions(+), 27 deletions(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index fef0998..b13d67f 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -66,6 +66,7 @@ public class SparkInterpreter extends AbstractInterpreter {
}
private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
+ private static Class innerInterpreterClazz;
private AbstractSparkScalaInterpreter innerInterpreter;
private Map<String, String> innerInterpreterClassMap = new HashMap<>();
private SparkContext sc;
@@ -151,38 +152,45 @@ public class SparkInterpreter extends AbstractInterpreter {
*/
private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) throws Exception {
scalaVersion = extractScalaVersion(conf);
- ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader();
-
- String zeppelinHome = System.getenv("ZEPPELIN_HOME");
- if (zeppelinHome != null) {
- // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader.
- // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version>
-
- File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
- List<URL> urls = new ArrayList<>();
- for (File file : scalaJarFolder.listFiles()) {
- LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
- + scalaJarFolder);
- urls.add(file.toURI().toURL());
+ // Make sure the innerInterpreter Class is loaded only once into JVM
+ // Use double lock to ensure thread safety
+ if (innerInterpreterClazz == null) {
+ synchronized (SparkInterpreter.class) {
+ if (innerInterpreterClazz == null) {
+ LOGGER.debug("innerInterpreterClazz is null, thread:{}", Thread.currentThread().getName());
+ ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader();
+ String zeppelinHome = System.getenv("ZEPPELIN_HOME");
+ if (zeppelinHome != null) {
+ // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader.
+ // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version>
+ File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
+ List<URL> urls = new ArrayList<>();
+ for (File file : scalaJarFolder.listFiles()) {
+ LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
+ + scalaJarFolder);
+ urls.add(file.toURI().toURL());
+ }
+ scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
+ Thread.currentThread().getContextClassLoader());
+ }
+ String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+ innerInterpreterClazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
+ }
}
- scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
- Thread.currentThread().getContextClassLoader());
}
-
- String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
- Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
return (AbstractSparkScalaInterpreter)
- clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
- .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir);
+ innerInterpreterClazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
+ .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir);
}
- @Override
+ @Override
public void close() throws InterpreterException {
LOGGER.info("Close SparkInterpreter");
if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
innerInterpreter.close();
- innerInterpreter = null;
+ innerInterpreterClazz = null;
}
+ innerInterpreter = null;
}
@Override
@@ -306,4 +314,9 @@ public class SparkInterpreter extends AbstractInterpreter {
public boolean isUnsupportedSparkVersion() {
return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion();
}
+
+ public AbstractSparkScalaInterpreter getInnerInterpreter() {
+ return innerInterpreter;
+ }
+
}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 7b3d1df..297b55d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -544,7 +544,7 @@ public class SparkInterpreterTest {
}
@Test
- public void testScopedMode() throws InterpreterException {
+ public void testScopedMode() throws Exception {
Properties properties = new Properties();
properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local");
properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test");
@@ -567,6 +567,9 @@ public class SparkInterpreterTest {
interpreter1.open();
interpreter2.open();
+ // check if there is any duplicated loaded class
+ assertEquals(true, interpreter1.getInnerInterpreter().getClass()==interpreter2.getInnerInterpreter().getClass());
+
InterpreterContext context = getInterpreterContext();
InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", context);
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 41470f1..d1a3b08 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -120,6 +120,7 @@ class SparkScala211Interpreter(override val conf: SparkConf,
super.close()
if (sparkILoop != null) {
sparkILoop.closeInterpreter()
+ sparkILoop = null
}
}
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index e9c127d..4918e4b 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -111,6 +111,7 @@ class SparkScala212Interpreter(override val conf: SparkConf,
super.close()
if (sparkILoop != null) {
sparkILoop.closeInterpreter()
+ sparkILoop = null
}
}
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index f984e15..46100da 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -202,13 +202,14 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
}
if (sc != null) {
sc.stop()
+ sc = null
}
- sc = null
if (sparkSession != null) {
sparkSession.getClass.getMethod("stop").invoke(sparkSession)
sparkSession = null
}
sqlContext = null
+ z = null
}
private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index aff8139..6b499fa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -95,6 +95,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.Optional;
import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
@@ -379,6 +380,13 @@ public class RemoteInterpreterServer extends Thread
Integer.parseInt(properties.getOrDefault("zeppelin.interpreter.result.cache", "0"));
}
+ boolean isPresent = Optional.ofNullable(interpreterGroup.get(sessionId)).orElse(new ArrayList<>()).stream()
+ .filter(m -> m.getClassName().equals(className)).findAny().isPresent();
+ if (isPresent) {
+ LOGGER.info("interpreter {} is existing", className);
+ return;
+ }
+
Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
Properties p = new Properties();
p.putAll(properties);
@@ -482,9 +490,18 @@ public class RemoteInterpreterServer extends Thread
Iterator<Interpreter> it = interpreters.iterator();
while (it.hasNext()) {
Interpreter inp = it.next();
- if (inp.getClassName().equals(className)) {
+ boolean isOpen = false;
+ if (inp instanceof LazyOpenInterpreter) {
+ LazyOpenInterpreter lazy = (LazyOpenInterpreter) inp;
+ isOpen = lazy.isOpen();
+ }
+ // only remove the open and matched interpreter
+ if (inp.getClassName().equals(className) && isOpen) {
try {
+ LOGGER.debug("Trying to close interpreter {} with scheduler thread{}", inp.getClassName(), inp.getScheduler().getName());
inp.close();
+ // close the thread
+ inp.getScheduler().stop();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter", e);
}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 20be866..7f6e0e5 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
@@ -119,8 +120,13 @@ public class RemoteInterpreterServerTest {
assertEquals(2, interpreter1.getProperties().size());
assertEquals("value_1", interpreter1.getProperty("property_1"));
- // create Test2Interpreter in session_1
+ // create duplicated Test1Interpreter in session_1
server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
+ intpProperties, "user_1");
+ assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+ // create Test2Interpreter in session_1
+ server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
intpProperties, "user_1");
assertEquals(2, server.getInterpreterGroup().get("session_1").size());
@@ -188,9 +194,31 @@ public class RemoteInterpreterServerTest {
assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(),
intpContext));
- // close
+ // before close -> thread of Test1Interpreter is running
+ assertEquals(true, isThreadRunning(interpreter1.getScheduler().getName()));
+
+ // close opened Test1Interpreter -> remove from interpreterGroup
server.close("session_1", Test1Interpreter.class.getName());
assertTrue(interpreter1.closed.get());
+ assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+ // close unopened Test2Interpreter -> keep in interpreterGroup
+ server.close("session_1", Test2Interpreter.class.getName());
+ assertEquals(1, server.getInterpreterGroup().get("session_1").size());
+
+ // after close -> thread of Test1Interpreter is not running
+ assertEquals(false, isThreadRunning(interpreter1.getScheduler().getName()));
+ }
+
+ private boolean isThreadRunning(String schedulerName) {
+ boolean res = false;
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for (Thread t : threads) {
+ if (!t.getName().contains(schedulerName)) continue;
+ res = true;
+ break;
+ }
+ return res;
}
public static class Test1Interpreter extends Interpreter {