You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/03/20 00:34:03 UTC

git commit: SPARK-1099:Spark's local mode should probably respect spark.cores.max by default

Repository: spark
Updated Branches:
  refs/heads/master 67fa71cba -> 16789317a


SPARK-1099:Spark's local mode should probably respect spark.cores.max by default

This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099
And this is what I do in this patch (also commented in the JIRA) @aarondav

 This is really a behavioral change, so I do this with great caution, and welcome any review advice:

1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores
The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default.
So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here.
2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern"
rules:
a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue
b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it
c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max
3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules.

Author: qqsun8819 <ji...@alibaba-inc.com>

Closes #110 from qqsun8819/local-cores and squashes the following commits:

731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor
78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern
6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16789317
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16789317
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16789317

Branch: refs/heads/master
Commit: 16789317a34c1974f7b35960f06a7b51d8e0f29f
Parents: 67fa71c
Author: qqsun8819 <ji...@alibaba-inc.com>
Authored: Wed Mar 19 16:33:54 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Wed Mar 19 16:33:54 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala    |  5 ++++-
 .../test/scala/org/apache/spark/FileSuite.scala  |  4 ++--
 .../SparkContextSchedulerCreationSuite.scala     | 19 ++++++++++++++++---
 3 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16789317/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1003b7..8f74607 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1262,7 +1262,10 @@ object SparkContext extends Logging {
     master match {
       case "local" =>
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
-        val backend = new LocalBackend(scheduler, 1)
+        // Use user specified in config, up to all available cores
+        val realCores = Runtime.getRuntime.availableProcessors()
+        val toUseCores = math.min(sc.conf.getInt("spark.cores.max", realCores), realCores)
+        val backend = new LocalBackend(scheduler, toUseCores)
         scheduler.initialize(backend)
         scheduler
 

http://git-wip-us.apache.org/repos/asf/spark/blob/16789317/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 01af940..b4a5881 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._
 class FileSuite extends FunSuite with LocalSparkContext {
 
   test("text files") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local[1]", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 4)
@@ -176,7 +176,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
 
   test("write SequenceFile using new Hadoop API") {
     import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local[1]", "test")
     val tempDir = Files.createTempDir()
     val outputDir = new File(tempDir, "output").getAbsolutePath
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))

http://git-wip-us.apache.org/repos/asf/spark/blob/16789317/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index b543471..9dd42be 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,10 @@ import org.apache.spark.scheduler.local.LocalBackend
 class SparkContextSchedulerCreationSuite
   extends FunSuite with PrivateMethodTester with LocalSparkContext with Logging {
 
-  def createTaskScheduler(master: String): TaskSchedulerImpl = {
+  def createTaskScheduler(master: String, conf: SparkConf = new SparkConf()): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
     val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
     sched.asInstanceOf[TaskSchedulerImpl]
@@ -44,13 +44,26 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("local") {
-    val sched = createTaskScheduler("local")
+    var conf = new SparkConf()
+    conf.set("spark.cores.max", "1")
+    val sched = createTaskScheduler("local", conf)
     sched.backend match {
       case s: LocalBackend => assert(s.totalCores === 1)
       case _ => fail()
     }
   }
 
+  test("local-cores-exceed") {
+    val cores = Runtime.getRuntime.availableProcessors() + 1
+    var conf = new SparkConf()
+    conf.set("spark.cores.max", cores.toString)
+    val sched = createTaskScheduler("local", conf)
+    sched.backend match {
+      case s: LocalBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors())
+      case _ => fail()
+    }
+  }
+
   test("local-n") {
     val sched = createTaskScheduler("local[5]")
     assert(sched.maxTaskFailures === 1)