You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2024/02/08 06:42:05 UTC

(spark) branch branch-3.5 updated: [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again

This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 77f8b38a1091 [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again
77f8b38a1091 is described below

commit 77f8b38a1091aa51af32dc790b61ae54ac47a2c2
Author: panbingkun <pa...@baidu.com>
AuthorDate: Thu Feb 8 14:41:51 2024 +0800

    [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again
    
    ### What changes were proposed in this pull request?
    The pr aims to
    - fix potential bug(ie: https://github.com/apache/spark/pull/44208) and enhance user experience.
    - make the code more compliant with standards
    
    Backport above to branch 3.5.
    Master branch pr: https://github.com/apache/spark/pull/44343
    
    ### Why are the changes needed?
    We use the local maven repo as the first-level cache in ivy.  The original intention was to reduce the time required to parse and obtain the ar, but when there are corrupted files in the local maven repo,The above mechanism will be directly interrupted and the prompt is very unfriendly, which will greatly confuse the user.  Based on the original intention, we should skip the cache directly in similar situations.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manually test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45017 from panbingkun/branch-3.5_SPARK-46400.
    
    Authored-by: panbingkun <pa...@baidu.com>
    Signed-off-by: yangjie01 <ya...@baidu.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 116 +++++++++++++++++----
 .../sql/hive/client/IsolatedClientLoader.scala     |   4 +
 2 files changed, 98 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index af35f451e370..0f0d8b6c07c0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,7 +41,7 @@ import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
 import org.apache.ivy.core.module.descriptor._
 import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.report.{DownloadStatus, ResolveReport}
 import org.apache.ivy.core.resolve.ResolveOptions
 import org.apache.ivy.core.retrieve.RetrieveOptions
 import org.apache.ivy.core.settings.IvySettings
@@ -1226,7 +1226,7 @@ private[spark] object SparkSubmitUtils extends Logging {
         s"be whitespace. The artifactId provided is: ${splits(1)}")
       require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
         s"be whitespace. The version provided is: ${splits(2)}")
-      new MavenCoordinate(splits(0), splits(1), splits(2))
+      MavenCoordinate(splits(0), splits(1), splits(2))
     }
   }
 
@@ -1241,21 +1241,27 @@ private[spark] object SparkSubmitUtils extends Logging {
   }
 
   /**
-   * Extracts maven coordinates from a comma-delimited string
+   * Create a ChainResolver used by Ivy to search for and resolve dependencies.
+   *
    * @param defaultIvyUserDir The default user path for Ivy
+   * @param useLocalM2AsCache Whether to use the local maven repo as a cache
    * @return A ChainResolver used by Ivy to search for and resolve dependencies.
    */
-  def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
+  def createRepoResolvers(
+      defaultIvyUserDir: File,
+      useLocalM2AsCache: Boolean = true): ChainResolver = {
     // We need a chain resolver if we want to check multiple repositories
     val cr = new ChainResolver
     cr.setName("spark-list")
 
-    val localM2 = new IBiblioResolver
-    localM2.setM2compatible(true)
-    localM2.setRoot(m2Path.toURI.toString)
-    localM2.setUsepoms(true)
-    localM2.setName("local-m2-cache")
-    cr.add(localM2)
+    if (useLocalM2AsCache) {
+      val localM2 = new IBiblioResolver
+      localM2.setM2compatible(true)
+      localM2.setRoot(m2Path.toURI.toString)
+      localM2.setUsepoms(true)
+      localM2.setName("local-m2-cache")
+      cr.add(localM2)
+    }
 
     val localIvy = new FileSystemResolver
     val localIvyRoot = new File(defaultIvyUserDir, "local")
@@ -1351,18 +1357,23 @@ private[spark] object SparkSubmitUtils extends Logging {
 
   /**
    * Build Ivy Settings using options with default resolvers
+   *
    * @param remoteRepos Comma-delimited string of remote repositories other than maven central
    * @param ivyPath The path to the local ivy repository
+   * @param useLocalM2AsCache Whether or not use `local-m2 repo` as cache
    * @return An IvySettings object
    */
-  def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
+  def buildIvySettings(
+      remoteRepos: Option[String],
+      ivyPath: Option[String],
+      useLocalM2AsCache: Boolean = true): IvySettings = {
     val ivySettings: IvySettings = new IvySettings
     processIvyPathArg(ivySettings, ivyPath)
 
     // create a pattern matcher
     ivySettings.addMatcher(new GlobPatternMatcher)
     // create the dependency resolvers
-    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache)
     ivySettings.addResolver(repoResolver)
     ivySettings.setDefaultResolver(repoResolver.getName)
     processRemoteRepoArg(ivySettings, remoteRepos)
@@ -1459,7 +1470,7 @@ private[spark] object SparkSubmitUtils extends Logging {
    */
   private def clearIvyResolutionFiles(
       mdId: ModuleRevisionId,
-      ivySettings: IvySettings,
+      defaultCacheFile: File,
       ivyConfName: String): Unit = {
     val currentResolutionFiles = Seq(
       s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
@@ -1467,14 +1478,40 @@ private[spark] object SparkSubmitUtils extends Logging {
       s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
     )
     currentResolutionFiles.foreach { filename =>
-      new File(ivySettings.getDefaultCache, filename).delete()
+      new File(defaultCacheFile, filename).delete()
+    }
+  }
+
+  /**
+   * Clear invalid cache files in ivy. The cache file is usually at
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml,
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties.
+   * Because when using `local-m2` repo as a cache, some invalid files were created.
+   * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache`
+   * will be generated, making some confusion for users.
+   */
+  private def clearInvalidIvyCacheFiles(
+      mdId: ModuleRevisionId,
+      defaultCacheFile: File): Unit = {
+    val cacheFiles = Seq(
+      s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml",
+      s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml.original",
+      s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivydata-${mdId.getRevision}.properties")
+    cacheFiles.foreach { filename =>
+      new File(defaultCacheFile, filename).delete()
     }
   }
 
   /**
    * Resolves any dependencies that were supplied through maven coordinates
+   *
    * @param coordinates Comma-delimited string of maven coordinates
    * @param ivySettings An IvySettings containing resolvers to use
+   * @param noCacheIvySettings An no-cache(local-m2-cache) IvySettings containing resolvers to use
    * @param transitive Whether resolving transitive dependencies, default is true
    * @param exclusions Exclusions to apply when resolving transitive dependencies
    * @return Seq of path to the jars of the given maven artifacts including their
@@ -1483,6 +1520,7 @@ private[spark] object SparkSubmitUtils extends Logging {
   def resolveMavenCoordinates(
       coordinates: String,
       ivySettings: IvySettings,
+      noCacheIvySettings: Option[IvySettings] = None,
       transitive: Boolean,
       exclusions: Seq[String] = Nil,
       isTest: Boolean = false): Seq[String] = {
@@ -1511,6 +1549,8 @@ private[spark] object SparkSubmitUtils extends Logging {
         // scalastyle:on println
 
         val ivy = Ivy.newInstance(ivySettings)
+        ivy.pushContext()
+
         // Set resolve options to download transitive dependencies as well
         val resolveOptions = new ResolveOptions
         resolveOptions.setTransitive(transitive)
@@ -1523,6 +1563,11 @@ private[spark] object SparkSubmitUtils extends Logging {
         } else {
           resolveOptions.setDownload(true)
         }
+        // retrieve all resolved dependencies
+        retrieveOptions.setDestArtifactPattern(
+          packagesDirectory.getAbsolutePath + File.separator +
+            "[organization]_[artifact]-[revision](-[classifier]).[ext]")
+        retrieveOptions.setConfs(Array(ivyConfName))
 
         // Add exclusion rules for Spark and Scala Library
         addExclusionRules(ivySettings, ivyConfName, md)
@@ -1534,17 +1579,44 @@ private[spark] object SparkSubmitUtils extends Logging {
         // resolve dependencies
         val rr: ResolveReport = ivy.resolve(md, resolveOptions)
         if (rr.hasError) {
-          throw new RuntimeException(rr.getAllProblemMessages.toString)
+          // SPARK-46302: When there are some corrupted jars in the local maven repo,
+          // we try to continue without the cache
+          val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
+          if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
+            val failedArtifacts = failedReports.map(r => r.getArtifact)
+            logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
+              s"attempt to retry while skipping local-m2-cache.")
+            failedArtifacts.foreach(artifact => {
+              clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
+            })
+            ivy.popContext()
+
+            val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get)
+            noCacheIvy.pushContext()
+
+            val noCacheRr = noCacheIvy.resolve(md, resolveOptions)
+            if (noCacheRr.hasError) {
+              throw new RuntimeException(noCacheRr.getAllProblemMessages.toString)
+            }
+            noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions)
+            val dependencyPaths = resolveDependencyPaths(
+              noCacheRr.getArtifacts.toArray, packagesDirectory)
+            noCacheIvy.popContext()
+
+            dependencyPaths
+          } else {
+            throw new RuntimeException(rr.getAllProblemMessages.toString)
+          }
+        } else {
+          ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions)
+          val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+          ivy.popContext()
+
+          dependencyPaths
         }
-        // retrieve all resolved dependencies
-        retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator +
-          "[organization]_[artifact]-[revision](-[classifier]).[ext]")
-        ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
-          retrieveOptions.setConfs(Array(ivyConfName)))
-        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
       } finally {
         System.setOut(sysOut)
-        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName)
+        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName)
       }
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index a28a0464e6ee..18090b53e3c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -139,6 +139,10 @@ private[hive] object IsolatedClientLoader extends Logging {
         SparkSubmitUtils.buildIvySettings(
           Some(remoteRepos),
           ivyPath),
+        Some(SparkSubmitUtils.buildIvySettings(
+          Some(remoteRepos),
+          ivyPath,
+          useLocalM2AsCache = false)),
         transitive = true,
         exclusions = version.exclusions)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org