You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:44:13 UTC

[jira] [Resolved] (SPARK-24728) org.apache.spark.repl.ExecutorClassLoader with cache

     [ https://issues.apache.org/jira/browse/SPARK-24728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24728.
----------------------------------
    Resolution: Incomplete

> org.apache.spark.repl.ExecutorClassLoader with cache
> ----------------------------------------------------
>
>                 Key: SPARK-24728
>                 URL: https://issues.apache.org/jira/browse/SPARK-24728
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: ant_nebula
>            Priority: Major
>              Labels: bulk-closed
>
> My realtime platform with spark support sql coding and scala coding in jsp page.
> In order to be able to do scala coding. My solution is:
> {code:none}
> export SPARK_DIST_CLASSPATH=/data/xx/my-driver-jar-with-dependencies.jar
> --conf spark.repl.class.outputDir=/data/xx/myclasss/ {code}
> {code:java}
> val flusher = new java.io.PrintWriter(System.err)
> val interpreter = {
>  val interpArguments = List(
>  "-Yrepl-class-based",
>  "-Yrepl-outdir", "/data/xx/myclasss/"
>  )
>  val settings = new GenericRunnerSettings(println _)
>  settings.embeddedDefaults(this.getClass.getClassLoader)
>  settings.usejavacp.value = true
>  settings.processArguments(interpArguments, true)
>  new IMain(settings, flusher)
> }
> interpreter.setContextClassLoader()
> ExecutorContext.interpreter = interpreter{code}
> {code:java}
> trait IApiCode extends Serializable {
>  def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit
> }{code}
> {code:java}
> object InterpretCodeFactory extends Logging {
>  val sqlActMap: Cache[String, IApiCode] = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).build()
>  def interpret(taskId: Integer, updateTime: java.util.Date, code: String): IApiCode = {
>  val key = taskId + DateFormatUtils.format(updateTime, "yyyyMMddHHmmss")
>  var result = sqlActMap.getIfPresent(key)
>  if (result == null) {
>  result = interpret(key, code)
>  }
>  result
>  }
>  def interpret(key: String, code: String): IApiCode = synchronized {
>  var result = sqlActMap.getIfPresent(key)
>  if (result == null) {
>  val genCodeResult = doGenCode(key, code)
>  ExecutorContext.interpreter.compileString(genCodeResult)
>  result = Class.forName(s"com.duowan.meteor.server.executor.apicode.ApiCode$key", true, ExecutorContext.interpreter.classLoader).newInstance().asInstanceOf[IApiCode]
>  sqlActMap.put(key, result)
>  }
>  result
>  }
>  def doGenCode(key: String, code: String): String = {
>  val result = s"""
>  |package com.duowan.meteor.server.executor.apicode
>  |
>  |class ApiCode$key extends com.duowan.meteor.server.executor.IApiCode {
>  |
>  | override def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit = {
>  | $code
>  | }
>  |
>  |}
>  """.stripMargin
>  logInfo(result)
>  result
>  }
> }{code}
>  And then I can execute scala code from jsp
> {code:java}
> val apiCode = InterpretCodeFactory.interpret(taskId, updateTime, codeFromJsp)
> apiCode.sql(ExecutorContext.spark, fromTable, cacheTable){code}
> In this way, i think i solve the problem SPARK-9219, but executor would always load class from driver disk.And it is not necessary after begin one.
> It would be better if 
> {code:java}
> org.apache.spark.repl.ExecutorClassLoader{code}
> could support guava cache in function findClass.And spark could also support config a switch with default close the cache.
> {code:java}
> override def findClass(name: String): Class[_] = {
>  if (userClassPathFirst) {
>  findClassLocally(name).getOrElse(parentLoader.loadClass(name))
>  } else {
>  try {
>  parentLoader.loadClass(name)
>  } catch {
>  case e: ClassNotFoundException =>
> // here support guava cache.
>  val classOption = findClassLocally(name)
>  classOption match {
>  case None => throw new ClassNotFoundException(name, e)
>  case Some(a) => a
>  }
>  }
>  }
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org