You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/24 07:01:38 UTC

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r529226242



##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
           case "file" => addLocalJarFile(new File(uri.getPath))
           // A JAR file which exists locally on every worker node
           case "local" => "file:" + uri.getPath
+          case "ivy" =>
+            // Since `new Path(path).toUri` will lose query information,
+            // so here we use `URI>create(path)`

Review comment:
       nit: `>` -> `.`?

##########
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##########
@@ -955,6 +955,20 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
         .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop()
     }
   }
+
+  test("SPARK-33084: Add jar support ivy url") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+    sc.addJar("ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0")
+    assert(sc.listJars().find(_.contains("scalajs-test-interface_2.12")).nonEmpty)

Review comment:
       nit: `find` -> `exists`

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1920,17 +1920,23 @@ class SparkContext(config: SparkConf) extends Logging {
           case "file" => addLocalJarFile(new File(uri.getPath))
           // A JAR file which exists locally on every worker node
           case "local" => "file:" + uri.getPath
+          case "ivy" =>
+            // Since `new Path(path).toUri` will lose query information,
+            // so here we use `URI>create(path)`
+            Utils.resolveMavenDependencies(URI.create(path))
           case _ => checkRemoteJarFile(path)
         }
       }
-      if (key != null) {
+      if (keys != null) {
         val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
-        if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-          logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-          postEnvironmentUpdate()
-        } else {
-          logWarning(s"The jar $path has been added already. Overwriting of added jars " +
-            "is not supported in the current version.")
+        keys.split(",").foreach { key =>

Review comment:
       Why do we need to split it by `,` here? It seems this PR adds no test for this case though.

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";

Review comment:
       `exclusin` -> `exclusion`

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module&transitive=false";

Review comment:
       Could you move the description of the syntaxes into the section `**file_name**` above? And, please put some **concrete** examples in this example section.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
     }
   }
 
+  def resolveJars(path: String): List[String] = {
+    val uri = new Path(path).toUri
+    uri.getScheme match {
+      case "ivy" =>
+        Utils.resolveMavenDependencies(URI.create(path))
+          .split(",").toList
+      case _ =>
+        path :: Nil

Review comment:
       nit format:
   ```
     def resolveJars(path: String): List[String] = {
       new Path(path).toUri.getScheme match {
         case "ivy" => Utils.resolveMavenDependencies(URI.create(path)).split(",").toList
         case _ => path :: Nil
       }
     }
   ```

##########
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##########
@@ -42,6 +42,10 @@ ADD JAR /tmp/test.jar;
 ADD JAR "/path/to/some.jar";
 ADD JAR '/some/other.jar';
 ADD JAR "/path with space/abc.jar";
+ADD JAR "ivy://group:module:version";
+ADD JAR "ivy://group:module:version?transitive=true";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module";
+ADD JAR "ivy://group:module:version?exclusin=group:module,group:module&transitive=false";

Review comment:
       Also, I think we need an explanation about each param, e.g., `exclusion` and `transitive`.

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
     metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+    val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+      Seq(
+        "spark.jars.repositories",
+        "spark.jars.ivy",
+        "spark.jars.ivySettings"
+      ).map(sys.props.get(_).orNull)
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = Option(ivySettingsPath) match {
+      case Some(path) =>
+        SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath))
+
+      case None =>
+        SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
+    }
+    SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+      parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   *         Example: Input:  exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *         Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): Array[String] = {
+    if (queryString == null || queryString.isEmpty) {
+      Array.empty[String]
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")
+      assert(mapTokens.forall(_.split("=").length == 2), "Invalid query string: " + queryString)
+      mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1))).filter(_._1 == "exclude")
+        .flatMap { case (_, excludeString) =>
+          val excludes: Array[String] = excludeString.split(",")
+          assert(excludes.forall(_.split(":").length == 2),
+            "Invalid exclude string: expected 'org:module,org:module,..', found " + excludeString)
+          excludes
+        }
+    }
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   *         Example: Input:  exclude=org.mortbay.jetty:jetty&transitive=true
+   *         Output:  true
+   */
+  private def parseTransitive(queryString: String): Boolean = {
+    if (queryString == null || queryString.isEmpty) {
+      false
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")
+      assert(mapTokens.forall(_.split("=").length == 2), "Invalid query string: " + queryString)
+      val transitive = mapTokens.map(_.split("=")).map(kv => (kv(0), kv(1)))
+        .filter(_._1 == "transitive")

Review comment:
       Some code to parse params look duplicated, so could you share them between `parseTransitive` and `parseExcludeList `?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
##########
@@ -159,6 +161,17 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
     }
   }
 
+  def resolveJars(path: String): List[String] = {

Review comment:
       protected

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2980,6 +2980,75 @@ private[spark] object Utils extends Logging {
     metadata.toString
   }
 
+  /**
+   * Download Ivy URIs dependent jars.
+   *
+   * @param uri Ivy uri need to be downloaded.
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+    val Seq(repositories, ivyRepoPath, ivySettingsPath) =
+      Seq(
+        "spark.jars.repositories",
+        "spark.jars.ivy",
+        "spark.jars.ivySettings"
+      ).map(sys.props.get(_).orNull)
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = Option(ivySettingsPath) match {
+      case Some(path) =>
+        SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath))
+
+      case None =>
+        SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
+    }
+    SparkSubmitUtils.resolveMavenCoordinates(uri.getAuthority, ivySettings,
+      parseExcludeList(uri.getQuery), parseTransitive(uri.getQuery))
+  }
+
+  /**
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   *         Example: Input:  exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   *         Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(queryString: String): Array[String] = {
+    if (queryString == null || queryString.isEmpty) {
+      Array.empty[String]
+    } else {
+      val mapTokens: Array[String] = queryString.split("&")

Review comment:
       nit: we don't need the type `: Array[String]`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org