You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/04 08:48:50 UTC

[1/2] git commit: Merge pull request #125 from velvia/2013-10/local-jar-uri

Updated Branches:
  refs/heads/branch-0.8 57fdb3feb -> e094dafda


Merge pull request #125 from velvia/2013-10/local-jar-uri

Add support for local:// URI scheme for addJars()

This PR adds support for a new URI scheme for SparkContext.addJars():  `local://file/path`.
The *local* scheme indicates that the `/file/path` exists on every worker node.    The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters.  Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration.

I would add something to the docs, but it's not obvious where to add it.

Oh, and it would be great if this could be merged in time for 0.8.1.

(cherry picked from commit 618c1f6cf3008caae7a8c0202721a6bd77d29a0f)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a9e7787e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a9e7787e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a9e7787e

Branch: refs/heads/branch-0.8
Commit: a9e7787e17b6749f61c77835a3f111bf01f0fe8e
Parents: 57fdb3f
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Wed Oct 30 12:03:44 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 3 23:48:26 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala  |  6 +++++-
 .../scala/org/apache/spark/FileServerSuite.scala    | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9e7787e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d22795d..2832f31 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -688,7 +688,7 @@ class SparkContext(
   /**
    * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
    * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.
+   * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
    */
   def addJar(path: String) {
     if (path == null) {
@@ -701,6 +701,7 @@ class SparkContext(
       } else {
         val uri = new URI(path)
         key = uri.getScheme match {
+          // A JAR file which exists only on the driver node
           case null | "file" =>
             if (SparkHadoopUtil.get.isYarnMode()) {
               // In order for this to work on yarn the user must specify the --addjars option to
@@ -718,6 +719,9 @@ class SparkContext(
             } else {
               env.httpFileServer.addJar(new File(uri.getPath))
             }
+          // A JAR file which exists locally on every worker node
+          case "local" =>
+            "file:" + uri.getPath
           case _ =>
             path
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9e7787e/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 35d1d41..c210dd5 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     }.collect()
     assert(result.toSet === Set((1,2), (2,7), (3,121)))
   }
+
+  test ("Dynamically adding JARS on a standalone cluster using local: URL") {
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+    sc.addJar(sampleJarFile.replace("file", "local"))
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+    val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", classOf[Int])
+      val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      a + b
+    }.collect()
+    assert(result.toSet === Set((1,2), (2,7), (3,121)))
+  }
 }


[2/2] git commit: Merge pull request #129 from velvia/2013-11/document-local-uris

Posted by rx...@apache.org.
Merge pull request #129 from velvia/2013-11/document-local-uris

Document & finish support for local: URIs

Review all the supported URI schemes for addJar / addFile to the Cluster Overview page.
Add support for local: URI to addFile.

(cherry picked from commit d6d11c2edbd11d2fde6dceb706711f2a4c3cf39d)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e094dafd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e094dafd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e094dafd

Branch: refs/heads/branch-0.8
Commit: e094dafda6adc961ad9ed2e54bc90d00116d91f3
Parents: a9e7787
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Fri Nov 1 15:40:33 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 3 23:48:40 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkContext.scala    |  3 ++-
 docs/cluster-overview.md                              | 14 +++++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e094dafd/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2832f31..10d3c53 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -594,7 +594,8 @@ class SparkContext(
     val uri = new URI(path)
     val key = uri.getScheme match {
       case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
-      case _ => path
+      case "local"       => "file:" + uri.getPath
+      case _             => path
     }
     addedFiles(key) = System.currentTimeMillis
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e094dafd/docs/cluster-overview.md
----------------------------------------------------------------------
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index f679cad..5927f73 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
 Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
 (either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
 applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
-worker processes that run computations and store data for your application. 
+worker processes that run computations and store data for your application.
 Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
 the executors. Finally, SparkContext sends *tasks* for the executors to run.
 
@@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python)
 worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar`
 and `addFile`.
 
+## URIs for addJar / addFile
+
+- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor
+  pulls the file from the driver HTTP server
+- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
+- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node.  This
+  means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
+  or shared via NFS, GlusterFS, etc.
+
+Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
+Over time this can use up a significant amount of space and will need to be cleaned up.
+
 # Monitoring
 
 Each driver program has a web UI, typically on port 4040, that displays information about running