You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:56 UTC

[18/33] git commit: Fix a few settings that were being read as system properties after merge

Fix a few settings that were being read as system properties after merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0bd1900c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0bd1900c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0bd1900c

Branch: refs/heads/master
Commit: 0bd1900cbce5946999c38293852d8ccd4f838930
Parents: b4ceed4
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sun Dec 29 15:38:46 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Dec 29 15:38:46 2013 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala       |  4 +++-
 .../apache/spark/scheduler/TaskSetManager.scala   | 18 ++++++++++--------
 .../spark/streaming/scheduler/JobScheduler.scala  |  4 ++--
 3 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 56a038d..bffd990 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
  */
 private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
-    val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+    val maxTaskFailures: Int,
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
+  def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
+
   val conf = sc.conf
 
   // How often to check for speculative tasks

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9b95e41..d752e6f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -54,12 +54,14 @@ private[spark] class TaskSetManager(
     clock: Clock = SystemClock)
   extends Schedulable with Logging
 {
+  val conf = sched.sc.conf
+
   // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+  val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+  val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -118,7 +120,7 @@ private[spark] class TaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+    conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
@@ -682,14 +684,14 @@ private[spark] class TaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = System.getProperty("spark.locality.wait", "3000")
+    val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
     level match {
       case TaskLocality.PROCESS_LOCAL =>
-        System.getProperty("spark.locality.wait.process", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
       case TaskLocality.NODE_LOCAL =>
-        System.getProperty("spark.locality.wait.node", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
       case TaskLocality.RACK_LOCAL =>
-        System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
       case TaskLocality.ANY =>
         0L
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9511ccf..7fd8d41 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming._
 
 /**
  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
- * the jobs and runs them using a thread pool. Number of threads 
+ * the jobs and runs them using a thread pool. Number of threads
  */
 private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
@@ -33,7 +33,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   initLogging()
 
   val jobSets = new ConcurrentHashMap[Time, JobSet]
-  val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+  val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
   val executor = Executors.newFixedThreadPool(numConcurrentJobs)
   val generator = new JobGenerator(this)
   val listenerBus = new StreamingListenerBus()