You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/04/29 01:40:01 UTC

spark git commit: [SPARK-14836][YARN] Zip all the jars before uploading to distributed cache

Repository: spark
Updated Branches:
  refs/heads/master 4f4721a21 -> 2398e3d69


[SPARK-14836][YARN] Zip all the jars before uploading to distributed cache

## What changes were proposed in this pull request?

<copy form JIRA>

Currently if neither `spark.yarn.jars` nor `spark.yarn.archive` is set (by default), Spark on yarn code will upload all the jars in the folder separately into distributed cache, this is quite time consuming, and very verbose, instead of upload jars separately into distributed cache, here changes to zip all the jars first, and then put into distributed cache.

This will significantly improve the speed of starting time.

## How was this patch tested?

Unit test and local integrated test is done.

Verified with SparkPi both in spark cluster and client mode.

Author: jerryshao <ss...@hortonworks.com>

Closes #12597 from jerryshao/SPARK-14836.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2398e3d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2398e3d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2398e3d6

Branch: refs/heads/master
Commit: 2398e3d69c9a675d651c192107953de8e6c2aecd
Parents: 4f4721a
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Apr 28 16:39:49 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Apr 28 16:39:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 21 +++++++++++++++++---
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 11 +++++-----
 2 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2398e3d6/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6184ad5..b494ef0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -496,11 +496,26 @@ private[spark] class Client(
             "to uploading libraries under SPARK_HOME.")
           val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
             sparkConf.getenv("SPARK_HOME")))
-          jarsDir.listFiles().foreach { f =>
-            if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
-              distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
+          val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
+            new File(Utils.getLocalDir(sparkConf)))
+          val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+
+          try {
+            jarsStream.setLevel(0)
+            jarsDir.listFiles().foreach { f =>
+              if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) {
+                jarsStream.putNextEntry(new ZipEntry(f.getName))
+                Files.copy(f, jarsStream)
+                jarsStream.closeEntry()
+              }
             }
+          } finally {
+            jarsStream.close()
           }
+
+          distribute(jarsArchive.toURI.getPath,
+            resType = LocalResourceType.ARCHIVE,
+            destName = Some(LOCALIZED_LIB_DIR))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2398e3d6/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index f196a0d..a408c48 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
     val client = createClient(sparkConf)
     client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
-      anyBoolean(), any())
     classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
   }
 
@@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val jarsDir = new File(libs, "jars")
     assert(jarsDir.mkdir())
     new FileOutputStream(new File(libs, "RELEASE")).close()
-    val userLibs = Utils.createTempDir()
+    val userLib1 = Utils.createTempDir()
+    val userLib2 = Utils.createTempDir()
 
     val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
-    val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
+    val jar2 = TestUtils.createJarWithFiles(Map(), userLib1)
     // Copy jar2 to jar3 with same name
     val jar3 = {
-      val target = new File(userLibs, new File(jar1.toURI).getName)
+      val target = new File(userLib2, new File(jar2.toURI).getName)
       val input = new FileInputStream(jar2.getPath)
       val output = new FileOutputStream(target)
       Utils.copyStream(input, output, closeStreams = true)
@@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
     val tempDir = Utils.createTempDir()
     client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
 
-    // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
+    // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be
     // ignored.
     sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org